|
|
@@ -42,8 +42,8 @@
|
|
|
%% records over the wire.
|
|
|
|
|
|
%% tags:
|
|
|
--define(stream, stream).
|
|
|
--define(it, it).
|
|
|
+-define(STREAM, 1).
|
|
|
+-define(IT, 2).
|
|
|
|
|
|
%% keys:
|
|
|
-define(tag, 1).
|
|
|
@@ -81,13 +81,13 @@
|
|
|
|
|
|
-type stream() ::
|
|
|
#{
|
|
|
- ?tag := ?stream,
|
|
|
+ ?tag := ?STREAM,
|
|
|
?storage_key := emqx_ds_lts:msg_storage_key()
|
|
|
}.
|
|
|
|
|
|
-type iterator() ::
|
|
|
#{
|
|
|
- ?tag := ?it,
|
|
|
+ ?tag := ?IT,
|
|
|
?topic_filter := emqx_ds:topic_filter(),
|
|
|
?start_time := emqx_ds:time(),
|
|
|
?storage_key := emqx_ds_lts:msg_storage_key(),
|
|
|
@@ -194,7 +194,7 @@ store_batch(_ShardId, S = #s{db = DB, data = Data}, Messages, _Options) ->
|
|
|
) -> [stream()].
|
|
|
get_streams(_Shard, #s{trie = Trie}, TopicFilter, _StartTime) ->
|
|
|
Indexes = emqx_ds_lts:match_topics(Trie, TopicFilter),
|
|
|
- [#{?tag => ?stream, ?storage_key => I} || I <- Indexes].
|
|
|
+ [#{?tag => ?STREAM, ?storage_key => I} || I <- Indexes].
|
|
|
|
|
|
-spec make_iterator(
|
|
|
emqx_ds_storage_layer:shard_id(),
|
|
|
@@ -204,13 +204,13 @@ get_streams(_Shard, #s{trie = Trie}, TopicFilter, _StartTime) ->
|
|
|
emqx_ds:time()
|
|
|
) -> {ok, iterator()}.
|
|
|
make_iterator(
|
|
|
- _Shard, _Data, #{?tag := ?stream, ?storage_key := StorageKey}, TopicFilter, StartTime
|
|
|
+ _Shard, _Data, #{?tag := ?STREAM, ?storage_key := StorageKey}, TopicFilter, StartTime
|
|
|
) ->
|
|
|
%% Note: it's a good idea to keep the iterator structure lean,
|
|
|
%% since it can be stored on a remote node that could update its
|
|
|
%% code independently from us.
|
|
|
{ok, #{
|
|
|
- ?tag => ?it,
|
|
|
+ ?tag => ?IT,
|
|
|
?topic_filter => TopicFilter,
|
|
|
?start_time => StartTime,
|
|
|
?storage_key => StorageKey,
|
|
|
@@ -225,7 +225,7 @@ next(_Shard, Schema = #s{ts_offset = TSOffset}, It, BatchSize) ->
|
|
|
SafeCutoffTime = (Now bsr TSOffset) bsl TSOffset,
|
|
|
next_until(Schema, It, SafeCutoffTime, BatchSize).
|
|
|
|
|
|
-next_until(_Schema, It = #{?tag := ?it, ?start_time := StartTime}, SafeCutoffTime, _BatchSize) when
|
|
|
+next_until(_Schema, It = #{?tag := ?IT, ?start_time := StartTime}, SafeCutoffTime, _BatchSize) when
|
|
|
StartTime >= SafeCutoffTime
|
|
|
->
|
|
|
%% We're in the middle of the current epoch, so we can't yet iterate over it.
|
|
|
@@ -235,7 +235,7 @@ next_until(_Schema, It = #{?tag := ?it, ?start_time := StartTime}, SafeCutoffTim
|
|
|
{ok, It, []};
|
|
|
next_until(#s{db = DB, data = CF, keymappers = Keymappers}, It, SafeCutoffTime, BatchSize) ->
|
|
|
#{
|
|
|
- ?tag := ?it,
|
|
|
+ ?tag := ?IT,
|
|
|
?start_time := StartTime,
|
|
|
?storage_key := {TopicIndex, Varying}
|
|
|
} = It,
|
|
|
@@ -286,7 +286,7 @@ next_loop(_ITHandle, _KeyMapper, _Filter, _Cutoff, It, Acc, 0) ->
|
|
|
{ok, It, lists:reverse(Acc)};
|
|
|
next_loop(ITHandle, KeyMapper, Filter, Cutoff, It0, Acc0, N0) ->
|
|
|
inc_counter(),
|
|
|
- #{?tag := ?it, ?last_seen_key := Key0} = It0,
|
|
|
+ #{?tag := ?IT, ?last_seen_key := Key0} = It0,
|
|
|
case emqx_ds_bitmask_keymapper:bin_increment(Filter, Key0) of
|
|
|
overflow ->
|
|
|
{ok, It0, lists:reverse(Acc0)};
|
|
|
@@ -346,7 +346,7 @@ check_message(
|
|
|
overflow;
|
|
|
check_message(
|
|
|
_Cutoff,
|
|
|
- #{?tag := ?it, ?start_time := StartTime, ?topic_filter := TopicFilter},
|
|
|
+ #{?tag := ?IT, ?start_time := StartTime, ?topic_filter := TopicFilter},
|
|
|
#message{timestamp = Timestamp, topic = Topic}
|
|
|
) when Timestamp >= StartTime ->
|
|
|
emqx_topic:match(emqx_topic:words(Topic), TopicFilter);
|