Преглед изворни кода

feat(stream): add simple stream over process message queue

Andrew Mayorov пре 2 година
родитељ
комит
f92b5b3f32

+ 13 - 0
apps/emqx_utils/src/emqx_utils_stream.erl

@@ -20,6 +20,7 @@
 -export([
     empty/0,
     list/1,
+    mqueue/1,
     map/2,
     chain/2
 ]).
@@ -59,6 +60,18 @@ list([]) ->
 list([X | Rest]) ->
     fun() -> [X | list(Rest)] end.
 
+%% @doc Make a stream out of process message queue.
+-spec mqueue(timeout()) -> stream(any()).
+mqueue(Timeout) ->
+    fun() ->
+        receive
+            X ->
+                [X | mqueue(Timeout)]
+        after Timeout ->
+            []
+        end
+    end.
+
 %% @doc Make a stream by applying a function to each element of the underlying stream.
 -spec map(fun((X) -> Y), stream(X)) -> stream(Y).
 map(F, S) ->

+ 9 - 0
apps/emqx_utils/test/emqx_utils_stream_tests.erl

@@ -73,3 +73,12 @@ chain_list_map_test() ->
         ["1", "2", "3", "4", "5", "6"],
         emqx_utils_stream:consume(S)
     ).
+
+mqueue_test() ->
+    _ = erlang:send_after(1, self(), 1),
+    _ = erlang:send_after(100, self(), 2),
+    _ = erlang:send_after(20, self(), 42),
+    ?assertEqual(
+        [1, 42, 2],
+        emqx_utils_stream:consume(emqx_utils_stream:mqueue(400))
+    ).