Bladeren bron

feat(utils-stream): add a few more stream combinators

Andrew Mayorov 1 jaar geleden
bovenliggende
commit
d30c99512a
2 gewijzigde bestanden met toevoegingen van 115 en 1 verwijderingen
  1. 41 1
      apps/emqx_utils/src/emqx_utils_stream.erl
  2. 74 0
      apps/emqx_utils/test/emqx_utils_stream_tests.erl

+ 41 - 1
apps/emqx_utils/src/emqx_utils_stream.erl

@@ -22,7 +22,9 @@
     list/1,
     mqueue/1,
     map/2,
-    chain/2
+    transpose/1,
+    chain/2,
+    repeat/1
 ]).
 
 %% Evaluating
@@ -91,6 +93,31 @@ map(F, S) ->
         end
     end.
 
+%% @doc Transpose a list of streams into a stream producing lists of their respective values.
+%% The resulting stream is as long as the shortest of the input streams.
+-spec transpose([stream(X)]) -> stream([X]).
+transpose([S]) ->
+    map(fun(X) -> [X] end, S);
+transpose([S | Streams]) ->
+    transpose_tail(S, transpose(Streams));
+transpose([]) ->
+    empty().
+
+transpose_tail(S, Tail) ->
+    fun() ->
+        case next(S) of
+            [X | SRest] ->
+                case next(Tail) of
+                    [Xs | TailRest] ->
+                        [[X | Xs] | transpose_tail(SRest, TailRest)];
+                    [] ->
+                        []
+                end;
+            [] ->
+                []
+        end
+    end.
+
 %% @doc Make a stream by chaining (concatenating) two streams.
 %% The second stream begins to produce values only after the first one is exhausted.
 -spec chain(stream(X), stream(Y)) -> stream(X | Y).
@@ -104,6 +131,19 @@ chain(SFirst, SThen) ->
         end
     end.
 
+%% @doc Make an infinite stream out of repeats of given stream.
+%% If the given stream is empty, the resulting stream is also empty.
+-spec repeat(stream(X)) -> stream(X).
+repeat(S) ->
+    fun() ->
+        case next(S) of
+            [X | SRest] ->
+                [X | chain(SRest, repeat(S))];
+            [] ->
+                []
+        end
+    end.
+
 %%
 
 %% @doc Produce the next value from the stream.

+ 74 - 0
apps/emqx_utils/test/emqx_utils_stream_tests.erl

@@ -74,6 +74,80 @@ chain_list_map_test() ->
         emqx_utils_stream:consume(S)
     ).
 
+transpose_test() ->
+    S = emqx_utils_stream:transpose([
+        emqx_utils_stream:list([1, 2, 3]),
+        emqx_utils_stream:list([4, 5, 6, 7])
+    ]),
+    ?assertEqual(
+        [[1, 4], [2, 5], [3, 6]],
+        emqx_utils_stream:consume(S)
+    ).
+
+transpose_none_test() ->
+    ?assertEqual(
+        [],
+        emqx_utils_stream:consume(emqx_utils_stream:transpose([]))
+    ).
+
+transpose_one_test() ->
+    S = emqx_utils_stream:transpose([emqx_utils_stream:list([1, 2, 3])]),
+    ?assertEqual(
+        [[1], [2], [3]],
+        emqx_utils_stream:consume(S)
+    ).
+
+transpose_many_test() ->
+    S = emqx_utils_stream:transpose([
+        emqx_utils_stream:list([1, 2, 3]),
+        emqx_utils_stream:list([4, 5, 6, 7]),
+        emqx_utils_stream:list([8, 9])
+    ]),
+    ?assertEqual(
+        [[1, 4, 8], [2, 5, 9]],
+        emqx_utils_stream:consume(S)
+    ).
+
+transpose_many_empty_test() ->
+    S = emqx_utils_stream:transpose([
+        emqx_utils_stream:list([1, 2, 3]),
+        emqx_utils_stream:list([4, 5, 6, 7]),
+        emqx_utils_stream:empty()
+    ]),
+    ?assertEqual(
+        [],
+        emqx_utils_stream:consume(S)
+    ).
+
+repeat_test() ->
+    S = emqx_utils_stream:repeat(emqx_utils_stream:list([1, 2, 3])),
+    ?assertMatch(
+        {[1, 2, 3, 1, 2, 3, 1, 2], _},
+        emqx_utils_stream:consume(8, S)
+    ),
+    {_, SRest} = emqx_utils_stream:consume(8, S),
+    ?assertMatch(
+        {[3, 1, 2, 3, 1, 2, 3, 1], _},
+        emqx_utils_stream:consume(8, SRest)
+    ).
+
+repeat_empty_test() ->
+    S = emqx_utils_stream:repeat(emqx_utils_stream:list([])),
+    ?assertEqual(
+        [],
+        emqx_utils_stream:consume(8, S)
+    ).
+
+transpose_repeat_test() ->
+    S = emqx_utils_stream:transpose([
+        emqx_utils_stream:repeat(emqx_utils_stream:list([1, 2])),
+        emqx_utils_stream:list([4, 5, 6, 7, 8])
+    ]),
+    ?assertEqual(
+        [[1, 4], [2, 5], [1, 6], [2, 7], [1, 8]],
+        emqx_utils_stream:consume(S)
+    ).
+
 mqueue_test() ->
     _ = erlang:send_after(1, self(), 1),
     _ = erlang:send_after(100, self(), 2),