|
|
@@ -371,7 +371,15 @@ update_iterator(DB, Iter0 = #{?tag := ?IT, ?shard := Shard, ?enc := StorageIter0
|
|
|
|
|
|
-spec next(emqx_ds:db(), iterator(), pos_integer()) -> emqx_ds:next_result(iterator()).
|
|
|
next(DB, Iter, N) ->
|
|
|
- with_worker(do_next, [DB, Iter, N]).
|
|
|
+ {ok, Ref} = anext(DB, Iter, N),
|
|
|
+ receive
|
|
|
+ #ds_async_result{ref = Ref, data = Data} ->
|
|
|
+ Data
|
|
|
+ end.
|
|
|
+
|
|
|
+-spec anext(emqx_ds:db(), iterator(), pos_integer()) -> {ok, reference()}.
|
|
|
+anext(DB, Iter, N) ->
|
|
|
+ emqx_ds_lib:anext_helper(?MODULE, do_next, [DB, Iter, N]).
|
|
|
|
|
|
-spec get_delete_streams(emqx_ds:db(), emqx_ds:topic_filter(), emqx_ds:time()) ->
|
|
|
[emqx_ds:ds_specific_delete_stream()].
|
|
|
@@ -412,7 +420,10 @@ make_delete_iterator(DB, ?delete_stream(Shard, InnerStream), TopicFilter, StartT
|
|
|
-spec delete_next(emqx_ds:db(), delete_iterator(), emqx_ds:delete_selector(), pos_integer()) ->
|
|
|
emqx_ds:delete_next_result(emqx_ds:delete_iterator()).
|
|
|
delete_next(DB, Iter, Selector, N) ->
|
|
|
- with_worker(do_delete_next, [DB, Iter, Selector, N]).
|
|
|
+ {ok, Ref} = emqx_ds_lib:anext_helper(?MODULE, do_delete_next, [DB, Iter, Selector, N]),
|
|
|
+ receive
|
|
|
+ #ds_async_result{ref = Ref, data = Data} -> Data
|
|
|
+ end.
|
|
|
|
|
|
%%================================================================================
|
|
|
%% Internal exports
|
|
|
@@ -466,20 +477,3 @@ timeus_to_timestamp(undefined) ->
|
|
|
undefined;
|
|
|
timeus_to_timestamp(TimestampUs) ->
|
|
|
TimestampUs div 1000.
|
|
|
-
|
|
|
-with_worker(F, A) ->
|
|
|
- Parent = self(),
|
|
|
- Ref = make_ref(),
|
|
|
- {_Pid, MRef} = spawn_opt(
|
|
|
- fun() ->
|
|
|
- Parent ! {Ref, apply(?MODULE, F, A)}
|
|
|
- end,
|
|
|
- [monitor, {min_heap_size, 10000}]
|
|
|
- ),
|
|
|
- receive
|
|
|
- {Ref, Result} ->
|
|
|
- erlang:demonitor(MRef, [flush]),
|
|
|
- Result;
|
|
|
- {'DOWN', MRef, _, _, _, Info} ->
|
|
|
- {error, unrecoverable, Info}
|
|
|
- end.
|