Explorar o código

chore(retainer): scan table in batches, improve stream usage

Co-authored-by: Thales Macedo Garitezi <thalesmg@gmail.com>
Co-authored-by: Zaiming (Stone) Shi <zmstone@gmail.com>
Ilya Averyanov hai 1 ano
pai
achega
1a664c941b

+ 17 - 26
apps/emqx_retainer/src/emqx_retainer_mnesia.erl

@@ -68,6 +68,8 @@
 -define(REINDEX_RPC_RETRY_INTERVAL, 1000).
 -define(REINDEX_INDEX_UPDATE_WAIT, 30000).
 
+-define(MESSAGE_SCAN_BATCH_SIZE, 100).
+
 %%--------------------------------------------------------------------
 %% Management API
 %%--------------------------------------------------------------------
@@ -207,7 +209,7 @@ delete_message(_State, Topic) ->
 read_message(_State, Topic) ->
     {ok, read_messages(Topic)}.
 
-match_messages(_State, Topic, undefined) ->
+match_messages(State, Topic, undefined) ->
     Tokens = topic_to_tokens(Topic),
     Now = erlang:system_time(millisecond),
     S = msg_stream(search_stream(Tokens, Now)),
@@ -215,7 +217,7 @@ match_messages(_State, Topic, undefined) ->
         all_remaining ->
             {ok, emqx_utils_stream:consume(S), undefined};
         BatchNum when is_integer(BatchNum) ->
-            match_messages(undefined, Topic, {S, BatchNum})
+            match_messages(State, Topic, {S, BatchNum})
     end;
 match_messages(_State, _Topic, {S0, BatchNum}) ->
     case emqx_utils_stream:consume(BatchNum, S0) of
@@ -246,16 +248,12 @@ page_read(_State, Topic, Page, Limit) ->
         )
     ),
     NSkip = (Page - 1) * Limit,
-    case emqx_utils_stream:consume(NSkip, S1) of
-        {_, S2} ->
-            case emqx_utils_stream:consume(Limit, S2) of
-                {Rows, _S3} ->
-                    {ok, true, Rows};
-                Rows when is_list(Rows) ->
-                    {ok, false, Rows}
-            end;
+    S2 = emqx_utils_stream:drop(NSkip, S1),
+    case emqx_utils_stream:consume(Limit, S2) of
+        {Rows, _S3} ->
+            {ok, true, Rows};
         Rows when is_list(Rows) ->
-            {ok, false, []}
+            {ok, false, Rows}
     end.
 
 clean(_) ->
@@ -362,24 +360,17 @@ search_stream(Index, FilterTokens, Now) ->
         fun(TopicTokens) -> match(IsExactMs, TopicTokens, FilterTokens) end,
         TopicStream
     ),
-    LookedUpRetainMsgStream = emqx_utils_stream:map(
-        fun(TopicTokens) -> ets:lookup(?TAB_MESSAGE, TopicTokens) end,
+    RetainMsgStream = emqx_utils_stream:chainmap(
+        fun(TopicTokens) -> emqx_utils_stream:list(ets:lookup(?TAB_MESSAGE, TopicTokens)) end,
         MatchingTopicStream
     ),
-    FoundAndValidLookedUpRetainMsgStream = emqx_utils_stream:filter(
-        fun
-            ([#retained_message{expiry_time = ExpiryTime}]) ->
-                ExpiryTime =:= 0 orelse ExpiryTime >= Now;
-            ([]) ->
-                false
+    ValidRetainMsgStream = emqx_utils_stream:filter(
+        fun(#retained_message{expiry_time = ExpiryTime}) ->
+            ExpiryTime =:= 0 orelse ExpiryTime > Now
         end,
-        LookedUpRetainMsgStream
-    ),
-    RetainMsgStream = emqx_utils_stream:map(
-        fun([RetainedMsg]) -> RetainedMsg end,
-        FoundAndValidLookedUpRetainMsgStream
+        RetainMsgStream
     ),
-    RetainMsgStream.
+    ValidRetainMsgStream.
 
 match(_IsExactMs = true, _TopicTokens, _FilterTokens) -> true;
 match(_IsExactMs = false, TopicTokens, FilterTokens) -> emqx_topic:match(TopicTokens, FilterTokens).
@@ -708,7 +699,7 @@ are_indices_updated(Indices) ->
 ets_stream(Tab) ->
     emqx_utils_stream:ets(
         fun
-            (undefined) -> ets:match_object(Tab, '_', 1);
+            (undefined) -> ets:match_object(Tab, '_', ?MESSAGE_SCAN_BATCH_SIZE);
             (Cont) -> ets:match_object(Cont)
         end
     ).

+ 1 - 4
apps/emqx_utils/test/emqx_utils_stream_tests.erl

@@ -242,10 +242,7 @@ interleave_stop_test() ->
 ets_test() ->
     T = ets:new(tab, [ordered_set]),
     Objects = [{N, N} || N <- lists:seq(1, 10)],
-    lists:foreach(
-        fun(Object) -> ets:insert(T, Object) end,
-        Objects
-    ),
+    ets:insert(T, Objects),
     S = emqx_utils_stream:ets(
         fun
             (undefined) -> ets:match_object(T, '_', 4);