|
@@ -304,7 +304,7 @@ get_streams(DB, TopicFilter, StartTime) ->
|
|
|
-spec make_iterator(
|
|
-spec make_iterator(
|
|
|
emqx_ds:db(), emqx_ds:ds_specific_stream(), emqx_ds:topic_filter(), emqx_ds:time()
|
|
emqx_ds:db(), emqx_ds:ds_specific_stream(), emqx_ds:topic_filter(), emqx_ds:time()
|
|
|
) ->
|
|
) ->
|
|
|
- emqx_ds:make_iterator_result(emqx_ds:ds_specific_iterator()).
|
|
|
|
|
|
|
+ emqx_ds:make_iterator_result(iterator()).
|
|
|
make_iterator(DB, ?stream(Shard, InnerStream), TopicFilter, StartTime) ->
|
|
make_iterator(DB, ?stream(Shard, InnerStream), TopicFilter, StartTime) ->
|
|
|
ShardId = {DB, Shard},
|
|
ShardId = {DB, Shard},
|
|
|
case
|
|
case
|
|
@@ -318,7 +318,7 @@ make_iterator(DB, ?stream(Shard, InnerStream), TopicFilter, StartTime) ->
|
|
|
Error
|
|
Error
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
--spec update_iterator(emqx_ds:db(), emqx_ds:ds_specific_iterator(), emqx_ds:message_key()) ->
|
|
|
|
|
|
|
+-spec update_iterator(emqx_ds:db(), iterator(), emqx_ds:message_key()) ->
|
|
|
emqx_ds:make_iterator_result(iterator()).
|
|
emqx_ds:make_iterator_result(iterator()).
|
|
|
update_iterator(DB, Iter0 = #{?tag := ?IT, ?shard := Shard, ?enc := StorageIter0}, Key) ->
|
|
update_iterator(DB, Iter0 = #{?tag := ?IT, ?shard := Shard, ?enc := StorageIter0}, Key) ->
|
|
|
case emqx_ds_storage_layer:update_iterator({DB, Shard}, StorageIter0, Key) of
|
|
case emqx_ds_storage_layer:update_iterator({DB, Shard}, StorageIter0, Key) of
|
|
@@ -396,7 +396,7 @@ do_next(DB, Iter0 = #{?tag := ?IT, ?shard := Shard, ?enc := StorageIter0}, N) ->
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
-spec do_delete_next(emqx_ds:db(), delete_iterator(), emqx_ds:delete_selector(), pos_integer()) ->
|
|
-spec do_delete_next(emqx_ds:db(), delete_iterator(), emqx_ds:delete_selector(), pos_integer()) ->
|
|
|
- emqx_ds:delete_next_result(emqx_ds:delete_iterator()).
|
|
|
|
|
|
|
+ emqx_ds:delete_next_result(delete_iterator()).
|
|
|
do_delete_next(
|
|
do_delete_next(
|
|
|
DB, Iter = #{?tag := ?DELETE_IT, ?shard := Shard, ?enc := StorageIter0}, Selector, N
|
|
DB, Iter = #{?tag := ?DELETE_IT, ?shard := Shard, ?enc := StorageIter0}, Selector, N
|
|
|
) ->
|
|
) ->
|