|
|
@@ -38,6 +38,20 @@
|
|
|
%% Type declarations
|
|
|
%%================================================================================
|
|
|
|
|
|
+%% # "Record" integer keys. We use maps with integer keys to avoid persisting and sending
|
|
|
+%% records over the wire.
|
|
|
+
|
|
|
+%% tags:
|
|
|
+-define(stream, stream).
|
|
|
+-define(it, it).
|
|
|
+
|
|
|
+%% keys:
|
|
|
+-define(tag, 1).
|
|
|
+-define(topic_filter, 2).
|
|
|
+-define(start_time, 3).
|
|
|
+-define(storage_key, 4).
|
|
|
+-define(last_seen_key, 5).
|
|
|
+
|
|
|
-type options() ::
|
|
|
#{
|
|
|
bits_per_wildcard_level => pos_integer(),
|
|
|
@@ -65,18 +79,20 @@
|
|
|
|
|
|
-type s() :: #s{}.
|
|
|
|
|
|
--record(stream, {
|
|
|
- storage_key :: emqx_ds_lts:msg_storage_key()
|
|
|
-}).
|
|
|
-
|
|
|
--record(it, {
|
|
|
- topic_filter :: emqx_ds:topic_filter(),
|
|
|
- start_time :: emqx_ds:time(),
|
|
|
- storage_key :: emqx_ds_lts:msg_storage_key(),
|
|
|
- last_seen_key = <<>> :: binary()
|
|
|
-}).
|
|
|
+-type stream() ::
|
|
|
+ #{
|
|
|
+ ?tag := ?stream,
|
|
|
+ ?storage_key := emqx_ds_lts:msg_storage_key()
|
|
|
+ }.
|
|
|
|
|
|
--type iterator() :: #it{}.
|
|
|
+-type iterator() ::
|
|
|
+ #{
|
|
|
+ ?tag := ?it,
|
|
|
+ ?topic_filter := emqx_ds:topic_filter(),
|
|
|
+ ?start_time := emqx_ds:time(),
|
|
|
+ ?storage_key := emqx_ds_lts:msg_storage_key(),
|
|
|
+ ?last_seen_key := binary()
|
|
|
+ }.
|
|
|
|
|
|
-define(COUNTER, emqx_ds_storage_bitfield_lts_counter).
|
|
|
|
|
|
@@ -170,18 +186,35 @@ store_batch(_ShardId, S = #s{db = DB, data = Data}, Messages, _Options) ->
|
|
|
Messages
|
|
|
).
|
|
|
|
|
|
+-spec get_streams(
|
|
|
+ emqx_ds_replication_layer:shard_id(),
|
|
|
+ s(),
|
|
|
+ emqx_ds:topic_filter(),
|
|
|
+ emqx_ds:time()
|
|
|
+) -> [stream()].
|
|
|
get_streams(_Shard, #s{trie = Trie}, TopicFilter, _StartTime) ->
|
|
|
Indexes = emqx_ds_lts:match_topics(Trie, TopicFilter),
|
|
|
- [#stream{storage_key = I} || I <- Indexes].
|
|
|
+ [#{?tag => ?stream, ?storage_key => I} || I <- Indexes].
|
|
|
|
|
|
-make_iterator(_Shard, _Data, #stream{storage_key = StorageKey}, TopicFilter, StartTime) ->
|
|
|
+-spec make_iterator(
|
|
|
+ emqx_ds_replication_layer:shard_id(),
|
|
|
+ s(),
|
|
|
+ stream(),
|
|
|
+ emqx_ds:topic_filter(),
|
|
|
+ emqx_ds:time()
|
|
|
+) -> {ok, iterator()}.
|
|
|
+make_iterator(
|
|
|
+ _Shard, _Data, #{?tag := ?stream, ?storage_key := StorageKey}, TopicFilter, StartTime
|
|
|
+) ->
|
|
|
%% Note: it's a good idea to keep the iterator structure lean,
|
|
|
%% since it can be stored on a remote node that could update its
|
|
|
%% code independently from us.
|
|
|
- {ok, #it{
|
|
|
- topic_filter = TopicFilter,
|
|
|
- start_time = StartTime,
|
|
|
- storage_key = StorageKey
|
|
|
+ {ok, #{
|
|
|
+ ?tag => ?it,
|
|
|
+ ?topic_filter => TopicFilter,
|
|
|
+ ?start_time => StartTime,
|
|
|
+ ?storage_key => StorageKey,
|
|
|
+ ?last_seen_key => <<>>
|
|
|
}}.
|
|
|
|
|
|
next(_Shard, Schema = #s{ts_offset = TSOffset}, It, BatchSize) ->
|
|
|
@@ -192,16 +225,19 @@ next(_Shard, Schema = #s{ts_offset = TSOffset}, It, BatchSize) ->
|
|
|
SafeCutoffTime = (Now bsr TSOffset) bsl TSOffset,
|
|
|
next_until(Schema, It, SafeCutoffTime, BatchSize).
|
|
|
|
|
|
-next_until(_Schema, It, SafeCutoffTime, _BatchSize) when It#it.start_time >= SafeCutoffTime ->
|
|
|
+next_until(_Schema, It = #{?tag := ?it, ?start_time := StartTime}, SafeCutoffTime, _BatchSize) when
|
|
|
+ StartTime >= SafeCutoffTime
|
|
|
+->
|
|
|
%% We're in the middle of the current epoch, so we can't yet iterate over it.
|
|
|
%% It would be unsafe otherwise: messages can be stored in the current epoch
|
|
|
%% concurrently with iterating over it. They can end up earlier (in the iteration
|
|
|
%% order) due to the nature of keymapping, potentially causing us to miss them.
|
|
|
{ok, It, []};
|
|
|
next_until(#s{db = DB, data = CF, keymappers = Keymappers}, It, SafeCutoffTime, BatchSize) ->
|
|
|
- #it{
|
|
|
- start_time = StartTime,
|
|
|
- storage_key = {TopicIndex, Varying}
|
|
|
+ #{
|
|
|
+ ?tag := ?it,
|
|
|
+ ?start_time := StartTime,
|
|
|
+ ?storage_key := {TopicIndex, Varying}
|
|
|
} = It,
|
|
|
%% Make filter:
|
|
|
Inequations = [
|
|
|
@@ -250,7 +286,7 @@ next_loop(_ITHandle, _KeyMapper, _Filter, _Cutoff, It, Acc, 0) ->
|
|
|
{ok, It, lists:reverse(Acc)};
|
|
|
next_loop(ITHandle, KeyMapper, Filter, Cutoff, It0, Acc0, N0) ->
|
|
|
inc_counter(),
|
|
|
- #it{last_seen_key = Key0} = It0,
|
|
|
+ #{?tag := ?it, ?last_seen_key := Key0} = It0,
|
|
|
case emqx_ds_bitmask_keymapper:bin_increment(Filter, Key0) of
|
|
|
overflow ->
|
|
|
{ok, It0, lists:reverse(Acc0)};
|
|
|
@@ -268,7 +304,7 @@ next_loop(ITHandle, KeyMapper, Filter, Cutoff, It0, Acc0, N0) ->
|
|
|
end.
|
|
|
|
|
|
traverse_interval(ITHandle, Filter, Cutoff, Key, Val, It0, Acc0, N) ->
|
|
|
- It = It0#it{last_seen_key = Key},
|
|
|
+ It = It0#{?last_seen_key := Key},
|
|
|
case emqx_ds_bitmask_keymapper:bin_checkmask(Filter, Key) of
|
|
|
true ->
|
|
|
Msg = deserialize(Val),
|
|
|
@@ -310,7 +346,7 @@ check_message(
|
|
|
overflow;
|
|
|
check_message(
|
|
|
_Cutoff,
|
|
|
- #it{start_time = StartTime, topic_filter = TopicFilter},
|
|
|
+ #{?tag := ?it, ?start_time := StartTime, ?topic_filter := TopicFilter},
|
|
|
#message{timestamp = Timestamp, topic = Topic}
|
|
|
) when Timestamp >= StartTime ->
|
|
|
emqx_topic:match(emqx_topic:words(Topic), TopicFilter);
|