Przeglądaj źródła

feat(ds): Add async next API

ieQu1 1 rok temu
rodzic
commit
8afb88cf72

+ 5 - 0
apps/emqx_durable_storage/include/emqx_ds.hrl

@@ -40,4 +40,9 @@
     filters = #{}
 }).
 
+-record(ds_async_result, {
+    ref :: reference(),
+    data :: emqx_ds:next_result()
+}).
+
 -endif.

+ 21 - 1
apps/emqx_durable_storage/src/emqx_ds.erl

@@ -39,7 +39,7 @@
 -export([store_batch/2, store_batch/3]).
 
 %% Message replay API:
--export([get_streams/3, make_iterator/4, update_iterator/3, next/3]).
+-export([get_streams/3, make_iterator/4, update_iterator/3, next/3, anext/3]).
 
 %% Message delete API:
 -export([get_delete_streams/3, make_delete_iterator/4, delete_next/4]).
@@ -263,6 +263,14 @@
 
 -callback next(db(), Iterator, pos_integer()) -> next_result(Iterator).
 
+%% Asynchronous next. Backend must reply to the calling process with
+%% `#ds_async_result{}' message, where `ref' field is equal to the
+%% returned reference.
+%%
+%% Reference is a process alias that can be unalised to ignore the
+%% result.
+-callback anext(db(), _Iterator, pos_integer()) -> {ok, reference()}.
+
 -callback get_delete_streams(db(), topic_filter(), time()) -> [ds_specific_delete_stream()].
 
 -callback make_delete_iterator(db(), ds_specific_delete_stream(), topic_filter(), time()) ->
@@ -281,6 +289,8 @@
     make_delete_iterator/4,
     delete_next/4,
 
+    anext/3,
+
     count/1
 ]).
 
@@ -414,6 +424,16 @@ update_iterator(DB, OldIter, DSKey) ->
 next(DB, Iter, BatchSize) ->
     ?module(DB):next(DB, Iter, BatchSize).
 
+-spec anext(db(), iterator(), pos_integer()) -> {ok, reference()}.
+anext(DB, Iter, BatchSize) ->
+    Mod = ?module(DB),
+    case erlang:function_exported(Mod, anext, 3) of
+        true ->
+            Mod:anext(DB, Iter, BatchSize);
+        false ->
+            emqx_ds_lib:anext_helper(Mod, next, [DB, Iter, BatchSize])
+    end.
+
 -spec get_delete_streams(db(), topic_filter(), time()) -> [delete_stream()].
 get_delete_streams(DB, TopicFilter, StartTime) ->
     Mod = ?module(DB),

+ 70 - 0
apps/emqx_durable_storage/src/emqx_ds_lib.erl

@@ -0,0 +1,70 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_ds_lib).
+
+-include("emqx_ds.hrl").
+
+%% API:
+-export([anext_helper/3]).
+
+%% behavior callbacks:
+-export([]).
+
+%% internal exports:
+-export([]).
+
+-export_type([]).
+
+%%================================================================================
+%% Type declarations
+%%================================================================================
+
+%%================================================================================
+%% API functions
+%%================================================================================
+
+anext_helper(Mod, Function, Args) ->
+    ReplyTo = alias([reply]),
+    spawn_opt(
+        fun() ->
+            Result =
+                try
+                    apply(Mod, Function, Args)
+                catch
+                    EC:Err:Stack ->
+                        {error, unrecoverable, #{
+                            msg => ?FUNCTION_NAME,
+                            EC => Err,
+                            stacktrace => Stack
+                        }}
+                end,
+            ReplyTo ! #ds_async_result{ref = ReplyTo, data = Result}
+        end,
+        [link, {min_heap_size, 10000}]
+    ),
+    {ok, ReplyTo}.
+
+%%================================================================================
+%% behavior callbacks
+%%================================================================================
+
+%%================================================================================
+%% Internal exports
+%%================================================================================
+
+%%================================================================================
+%% Internal functions
+%%================================================================================