|
|
@@ -46,6 +46,12 @@
|
|
|
shard_of_message/4
|
|
|
]).
|
|
|
|
|
|
+%% Internal exports:
|
|
|
+-export([
|
|
|
+ do_next/3,
|
|
|
+ do_delete_next/4
|
|
|
+]).
|
|
|
+
|
|
|
-export_type([db_opts/0, shard/0, iterator/0, delete_iterator/0]).
|
|
|
|
|
|
-include_lib("emqx_utils/include/emqx_message.hrl").
|
|
|
@@ -295,19 +301,8 @@ update_iterator(DB, Iter0 = #{?tag := ?IT, ?shard := Shard, ?enc := StorageIter0
|
|
|
end.
|
|
|
|
|
|
-spec next(emqx_ds:db(), iterator(), pos_integer()) -> emqx_ds:next_result(iterator()).
|
|
|
-next(DB, Iter0 = #{?tag := ?IT, ?shard := Shard, ?enc := StorageIter0}, N) ->
|
|
|
- ShardId = {DB, Shard},
|
|
|
- T0 = erlang:monotonic_time(microsecond),
|
|
|
- Result = emqx_ds_storage_layer:next(ShardId, StorageIter0, N, current_timestamp(ShardId)),
|
|
|
- T1 = erlang:monotonic_time(microsecond),
|
|
|
- emqx_ds_builtin_metrics:observe_next_time(DB, T1 - T0),
|
|
|
- case Result of
|
|
|
- {ok, StorageIter, Batch} ->
|
|
|
- Iter = Iter0#{?enc := StorageIter},
|
|
|
- {ok, Iter, Batch};
|
|
|
- Other ->
|
|
|
- Other
|
|
|
- end.
|
|
|
+next(DB, Iter, N) ->
|
|
|
+ with_worker(do_next, [DB, Iter, N]).
|
|
|
|
|
|
-spec get_delete_streams(emqx_ds:db(), emqx_ds:topic_filter(), emqx_ds:time()) ->
|
|
|
[emqx_ds:ds_specific_delete_stream()].
|
|
|
@@ -347,7 +342,36 @@ make_delete_iterator(DB, ?delete_stream(Shard, InnerStream), TopicFilter, StartT
|
|
|
|
|
|
-spec delete_next(emqx_ds:db(), delete_iterator(), emqx_ds:delete_selector(), pos_integer()) ->
|
|
|
emqx_ds:delete_next_result(emqx_ds:delete_iterator()).
|
|
|
-delete_next(DB, Iter = #{?tag := ?DELETE_IT, ?shard := Shard, ?enc := StorageIter0}, Selector, N) ->
|
|
|
+delete_next(DB, Iter, Selector, N) ->
|
|
|
+ with_worker(do_delete_next, [DB, Iter, Selector, N]).
|
|
|
+
|
|
|
+%%================================================================================
|
|
|
+%% Internal exports
|
|
|
+%%================================================================================
|
|
|
+
|
|
|
+current_timestamp(ShardId) ->
|
|
|
+ emqx_ds_builtin_local_meta:current_timestamp(ShardId).
|
|
|
+
|
|
|
+-spec do_next(emqx_ds:db(), iterator(), pos_integer()) -> emqx_ds:next_result(iterator()).
|
|
|
+do_next(DB, Iter0 = #{?tag := ?IT, ?shard := Shard, ?enc := StorageIter0}, N) ->
|
|
|
+ ShardId = {DB, Shard},
|
|
|
+ T0 = erlang:monotonic_time(microsecond),
|
|
|
+ Result = emqx_ds_storage_layer:next(ShardId, StorageIter0, N, current_timestamp(ShardId)),
|
|
|
+ T1 = erlang:monotonic_time(microsecond),
|
|
|
+ emqx_ds_builtin_metrics:observe_next_time(DB, T1 - T0),
|
|
|
+ case Result of
|
|
|
+ {ok, StorageIter, Batch} ->
|
|
|
+ Iter = Iter0#{?enc := StorageIter},
|
|
|
+ {ok, Iter, Batch};
|
|
|
+ Other ->
|
|
|
+ Other
|
|
|
+ end.
|
|
|
+
|
|
|
+-spec do_delete_next(emqx_ds:db(), delete_iterator(), emqx_ds:delete_selector(), pos_integer()) ->
|
|
|
+ emqx_ds:delete_next_result(emqx_ds:delete_iterator()).
|
|
|
+do_delete_next(
|
|
|
+ DB, Iter = #{?tag := ?DELETE_IT, ?shard := Shard, ?enc := StorageIter0}, Selector, N
|
|
|
+) ->
|
|
|
ShardId = {DB, Shard},
|
|
|
case
|
|
|
emqx_ds_storage_layer:delete_next(
|
|
|
@@ -362,13 +386,6 @@ delete_next(DB, Iter = #{?tag := ?DELETE_IT, ?shard := Shard, ?enc := StorageIte
|
|
|
Error
|
|
|
end.
|
|
|
|
|
|
-%%================================================================================
|
|
|
-%% Internal exports
|
|
|
-%%================================================================================
|
|
|
-
|
|
|
-current_timestamp(ShardId) ->
|
|
|
- emqx_ds_builtin_local_meta:current_timestamp(ShardId).
|
|
|
-
|
|
|
%%================================================================================
|
|
|
%% Internal functions
|
|
|
%%================================================================================
|
|
|
@@ -380,3 +397,20 @@ timeus_to_timestamp(undefined) ->
|
|
|
undefined;
|
|
|
timeus_to_timestamp(TimestampUs) ->
|
|
|
TimestampUs div 1000.
|
|
|
+
|
|
|
+with_worker(F, A) ->
|
|
|
+ Parent = self(),
|
|
|
+ Ref = make_ref(),
|
|
|
+ {_Pid, MRef} = spawn_opt(
|
|
|
+ fun() ->
|
|
|
+ Parent ! {Ref, apply(?MODULE, F, A)}
|
|
|
+ end,
|
|
|
+ [monitor, {min_heap_size, 10000}]
|
|
|
+ ),
|
|
|
+ receive
|
|
|
+ {Ref, Result} ->
|
|
|
+ erlang:demonitor(MRef, [flush]),
|
|
|
+ Result;
|
|
|
+ {'DOWN', MRef, _, _, _, Info} ->
|
|
|
+ {error, unrecoverable, Info}
|
|
|
+ end.
|