emqx_utils_stream.erl 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_utils_stream).
  17. %% Constructors / Combinators
  18. -export([
  19. empty/0,
  20. list/1,
  21. const/1,
  22. mqueue/1,
  23. map/2,
  24. transpose/1,
  25. chain/1,
  26. chain/2,
  27. repeat/1,
  28. interleave/2,
  29. limit_length/2,
  30. filter/2,
  31. drop/2,
  32. chainmap/2
  33. ]).
  34. %% Evaluating
  35. -export([
  36. next/1,
  37. consume/1,
  38. consume/2,
  39. foreach/2
  40. ]).
  41. %% Streams from ETS tables
  42. -export([
  43. ets/1
  44. ]).
  45. %% Streams from .csv data
  46. -export([
  47. csv/1,
  48. csv/2
  49. ]).
  50. -export_type([stream/1, csv_parse_opts/0]).
  51. %% @doc A stream is essentially a lazy list.
  52. -type stream_tail(T) :: fun(() -> next(T) | []).
  53. -type stream(T) :: list(T) | nonempty_improper_list(T, stream_tail(T)) | stream_tail(T).
  54. -type next(T) :: nonempty_improper_list(T, stream_tail(T)).
  55. -type csv_parse_opts() :: #{nullable => boolean(), filter_null => boolean()}.
  56. -dialyzer(no_improper_lists).
  57. -elvis([{elvis_style, nesting_level, disable}]).
  58. %%
  59. %% @doc Make a stream that produces no values.
  60. -spec empty() -> stream(none()).
  61. empty() ->
  62. [].
  63. %% @doc Make a stream out of the given list.
  64. %% Essentially it's an opposite of `consume/1`, i.e. `L = consume(list(L))`.
  65. -spec list([T]) -> stream(T).
  66. list(L) -> L.
  67. %% @doc Make a stream with a single element infinitely repeated
  68. -spec const(T) -> stream(T).
  69. const(T) ->
  70. fun() -> [T | const(T)] end.
  71. %% @doc Make a stream out of process message queue.
  72. -spec mqueue(timeout()) -> stream(any()).
  73. mqueue(Timeout) ->
  74. fun() ->
  75. receive
  76. X ->
  77. [X | mqueue(Timeout)]
  78. after Timeout ->
  79. []
  80. end
  81. end.
  82. %% @doc Make a stream by applying a function to each element of the underlying stream.
  83. -spec map(fun((X) -> Y), stream(X)) -> stream(Y).
  84. map(F, S) ->
  85. fun() ->
  86. case next(S) of
  87. [X | Rest] ->
  88. [F(X) | map(F, Rest)];
  89. [] ->
  90. []
  91. end
  92. end.
  93. %% @doc Make a stream by filtering the underlying stream with a predicate function.
  94. filter(F, S) ->
  95. FilterNext = fun FilterNext(St) ->
  96. case next(St) of
  97. [X | Rest] ->
  98. case F(X) of
  99. true ->
  100. [X | filter(F, Rest)];
  101. false ->
  102. FilterNext(Rest)
  103. end;
  104. [] ->
  105. []
  106. end
  107. end,
  108. fun() -> FilterNext(S) end.
  109. %% @doc Consumes the stream and applies the given function to each element.
  110. foreach(F, S) ->
  111. case next(S) of
  112. [X | Rest] ->
  113. F(X),
  114. foreach(F, Rest);
  115. [] ->
  116. ok
  117. end.
  118. %% @doc Drops N first elements from the stream
  119. -spec drop(non_neg_integer(), stream(T)) -> stream(T).
  120. drop(N, S) ->
  121. DropNext = fun DropNext(M, St) ->
  122. case next(St) of
  123. [_X | Rest] when M > 0 ->
  124. DropNext(M - 1, Rest);
  125. Next ->
  126. Next
  127. end
  128. end,
  129. fun() -> DropNext(N, S) end.
  130. %% @doc Stream version of flatmap.
  131. -spec chainmap(fun((X) -> stream(Y)), stream(X)) -> stream(Y).
  132. chainmap(F, S) ->
  133. ChainNext = fun ChainNext(St) ->
  134. case next(St) of
  135. [X | Rest] ->
  136. case next(F(X)) of
  137. [Y | YRest] ->
  138. [Y | chain(YRest, chainmap(F, Rest))];
  139. [] ->
  140. ChainNext(Rest)
  141. end;
  142. [] ->
  143. []
  144. end
  145. end,
  146. fun() -> ChainNext(S) end.
  147. %% @doc Transpose a list of streams into a stream producing lists of their respective values.
  148. %% The resulting stream is as long as the shortest of the input streams.
  149. -spec transpose([stream(X)]) -> stream([X]).
  150. transpose([S]) ->
  151. map(fun(X) -> [X] end, S);
  152. transpose([S | Streams]) ->
  153. transpose_tail(S, transpose(Streams));
  154. transpose([]) ->
  155. empty().
  156. transpose_tail(S, Tail) ->
  157. fun() ->
  158. case next(S) of
  159. [X | SRest] ->
  160. case next(Tail) of
  161. [Xs | TailRest] ->
  162. [[X | Xs] | transpose_tail(SRest, TailRest)];
  163. [] ->
  164. []
  165. end;
  166. [] ->
  167. []
  168. end
  169. end.
  170. %% @doc Make a stream by concatenating multiple streams.
  171. -spec chain([stream(X)]) -> stream(X).
  172. chain(L) ->
  173. lists:foldr(fun chain/2, empty(), L).
  174. %% @doc Make a stream by chaining (concatenating) two streams.
  175. %% The second stream begins to produce values only after the first one is exhausted.
  176. -spec chain(stream(X), stream(Y)) -> stream(X | Y).
  177. chain(SFirst, SThen) ->
  178. fun() ->
  179. case next(SFirst) of
  180. [X | SRest] ->
  181. [X | chain(SRest, SThen)];
  182. [] ->
  183. next(SThen)
  184. end
  185. end.
  186. %% @doc Make an infinite stream out of repeats of given stream.
  187. %% If the given stream is empty, the resulting stream is also empty.
  188. -spec repeat(stream(X)) -> stream(X).
  189. repeat(S) ->
  190. fun() ->
  191. case next(S) of
  192. [X | SRest] ->
  193. [X | chain(SRest, repeat(S))];
  194. [] ->
  195. []
  196. end
  197. end.
  198. %% @doc Interleave the elements of the streams.
  199. %%
  200. %% This function accepts a list of tuples where the first element
  201. %% specifies size of the "batch" to be consumed from the stream at a
  202. %% time (stream is the second tuple element). If element of the list
  203. %% is a plain stream, then the batch size is assumed to be 1.
  204. %%
  205. %% If `ContinueAtEmpty' is `false', and one of the streams returns
  206. %% `[]', then the function will return `[]' as well. Otherwise, it
  207. %% will continue consuming data from the remaining streams.
  208. -spec interleave([stream(X) | {non_neg_integer(), stream(X)}], boolean()) -> stream(X).
  209. interleave(L0, ContinueAtEmpty) ->
  210. L = lists:map(
  211. fun
  212. (Stream) when is_function(Stream) or is_list(Stream) ->
  213. {1, Stream};
  214. (A = {N, _}) when N >= 0 ->
  215. A
  216. end,
  217. L0
  218. ),
  219. fun() ->
  220. do_interleave(ContinueAtEmpty, 0, L, [])
  221. end.
  222. %% @doc Truncate list to the given length
  223. -spec limit_length(non_neg_integer(), stream(X)) -> stream(X).
  224. limit_length(0, _) ->
  225. fun() -> [] end;
  226. limit_length(N, S) when N >= 0 ->
  227. fun() ->
  228. case next(S) of
  229. [] ->
  230. [];
  231. [X | S1] ->
  232. [X | limit_length(N - 1, S1)]
  233. end
  234. end.
  235. %%
  236. %% @doc Produce the next value from the stream.
  237. -spec next(stream(T)) -> next(T) | [].
  238. next(EvalNext) when is_function(EvalNext) ->
  239. EvalNext();
  240. next([_ | _Rest] = EvaluatedNext) ->
  241. EvaluatedNext;
  242. next([]) ->
  243. [].
  244. %% @doc Consume the stream and return a list of all produced values.
  245. -spec consume(stream(T)) -> [T].
  246. consume(S) ->
  247. case next(S) of
  248. [X | SRest] ->
  249. [X | consume(SRest)];
  250. [] ->
  251. []
  252. end.
  253. %% @doc Consume N values from the stream and return a list of them and the rest of the stream.
  254. %% If the stream is exhausted before N values are produced, return just a list of these values.
  255. -spec consume(non_neg_integer(), stream(T)) -> {[T], stream(T)} | [T].
  256. consume(N, S) ->
  257. consume(N, S, []).
  258. consume(0, S, Acc) ->
  259. {lists:reverse(Acc), S};
  260. consume(N, S, Acc) ->
  261. case next(S) of
  262. [X | SRest] ->
  263. consume(N - 1, SRest, [X | Acc]);
  264. [] ->
  265. lists:reverse(Acc)
  266. end.
  267. %%
  268. -type select_result(Record, Cont) ::
  269. {[Record], Cont}
  270. | {[Record], '$end_of_table'}
  271. | '$end_of_table'.
  272. %% @doc Make a stream out of an ETS table, where the ETS table is scanned through in chunks,
  273. %% with the given continuation function. The function is assumed to return a result of a call to:
  274. %% * `ets:select/1` / `ets:select/3`
  275. %% * `ets:match/1` / `ets:match/3`
  276. %% * `ets:match_object/1` / `ets:match_object/3`
  277. -spec ets(fun((Cont) -> select_result(Record, Cont))) -> stream(Record).
  278. ets(ContF) when is_function(ContF) ->
  279. ets(undefined, ContF).
  280. ets(Cont, ContF) ->
  281. fun() ->
  282. case ContF(Cont) of
  283. {Records, '$end_of_table'} ->
  284. next(Records);
  285. {Records, NCont} ->
  286. next(chain(Records, ets(NCont, ContF)));
  287. '$end_of_table' ->
  288. []
  289. end
  290. end.
  291. %% @doc Make a stream out of a .csv binary, where the .csv binary is loaded in all at once.
  292. %% The .csv binary is assumed to be in UTF-8 encoding and to have a header row.
  293. -spec csv(binary()) -> stream(map()).
  294. csv(Bin) ->
  295. csv(Bin, #{}).
  296. -spec csv(binary(), csv_parse_opts()) -> stream(map()).
  297. csv(Bin, Opts) when is_binary(Bin) ->
  298. Liner =
  299. case Opts of
  300. #{nullable := true} ->
  301. fun csv_read_nullable_line/1;
  302. _ ->
  303. fun csv_read_line/1
  304. end,
  305. Maper =
  306. case Opts of
  307. #{filter_null := true} ->
  308. fun(Headers, Fields) ->
  309. maps:from_list(
  310. lists:filter(
  311. fun({_, Value}) ->
  312. Value =/= undefined
  313. end,
  314. lists:zip(Headers, Fields)
  315. )
  316. )
  317. end;
  318. _ ->
  319. fun(Headers, Fields) ->
  320. maps:from_list(lists:zip(Headers, Fields))
  321. end
  322. end,
  323. Reader = fun _Iter(Headers, Lines) ->
  324. case Liner(Lines) of
  325. {Fields, Rest} ->
  326. case length(Fields) == length(Headers) of
  327. true ->
  328. User = Maper(Headers, Fields),
  329. [User | fun() -> _Iter(Headers, Rest) end];
  330. false ->
  331. error(bad_format)
  332. end;
  333. eof ->
  334. []
  335. end
  336. end,
  337. HeadersAndLines = binary:split(Bin, [<<"\r">>, <<"\n">>], [global, trim_all]),
  338. case csv_read_line(HeadersAndLines) of
  339. {CSVHeaders, CSVLines} ->
  340. fun() -> Reader(CSVHeaders, CSVLines) end;
  341. eof ->
  342. empty()
  343. end.
  344. csv_read_line([Line | Lines]) ->
  345. %% XXX: not support ' ' for the field value
  346. Fields = binary:split(Line, [<<",">>, <<" ">>, <<"\n">>], [global, trim_all]),
  347. {Fields, Lines};
  348. csv_read_line([]) ->
  349. eof.
  350. csv_read_nullable_line([Line | Lines]) ->
  351. %% XXX: not support ' ' for the field value
  352. Fields = lists:map(
  353. fun(Bin) ->
  354. case string:trim(Bin, both) of
  355. <<>> ->
  356. undefined;
  357. Any ->
  358. Any
  359. end
  360. end,
  361. binary:split(Line, [<<",">>], [global])
  362. ),
  363. {Fields, Lines};
  364. csv_read_nullable_line([]) ->
  365. eof.
  366. do_interleave(_Cont, _, [], []) ->
  367. [];
  368. do_interleave(Cont, N, [{N, S} | Rest], Rev) ->
  369. do_interleave(Cont, 0, Rest, [{N, S} | Rev]);
  370. do_interleave(Cont, _, [], Rev) ->
  371. do_interleave(Cont, 0, lists:reverse(Rev), []);
  372. do_interleave(Cont, I, [{N, S} | Rest], Rev) when I < N ->
  373. case next(S) of
  374. [] when Cont ->
  375. do_interleave(Cont, 0, Rest, Rev);
  376. [] ->
  377. [];
  378. [X | S1] ->
  379. [
  380. X
  381. | fun() ->
  382. do_interleave(Cont, I + 1, [{N, S1} | Rest], Rev)
  383. end
  384. ]
  385. end.