|
@@ -293,6 +293,21 @@ delete_next(Shard, S, It0, Selector, BatchSize, Now, IsCurrent) ->
|
|
|
%% Internal functions
|
|
%% Internal functions
|
|
|
%%================================================================================
|
|
%%================================================================================
|
|
|
|
|
|
|
|
|
|
+%% Loop context:
|
|
|
|
|
+-record(ctx, {
|
|
|
|
|
+ shard,
|
|
|
|
|
+ %% Generation runtime state
|
|
|
|
|
+ s,
|
|
|
|
|
+ %% RocksDB iterators:
|
|
|
|
|
+ iters,
|
|
|
|
|
+ %% Cached topic structure for the static index:
|
|
|
|
|
+ topic_structure,
|
|
|
|
|
+ %% Maximum time:
|
|
|
|
|
+ tmax,
|
|
|
|
|
+ %% Compressed topic filter, split into words:
|
|
|
|
|
+ filter
|
|
|
|
|
+}).
|
|
|
|
|
+
|
|
|
get_streams(Trie, TopicFilter) ->
|
|
get_streams(Trie, TopicFilter) ->
|
|
|
lists:map(
|
|
lists:map(
|
|
|
fun({Static, _Varying}) ->
|
|
fun({Static, _Varying}) ->
|
|
@@ -316,34 +331,19 @@ serialize(#s{serialization_schema = SSchema, with_guid = WithGuid}, Varying, Msg
|
|
|
emqx_ds_msg_serializer:serialize(SSchema, Msg).
|
|
emqx_ds_msg_serializer:serialize(SSchema, Msg).
|
|
|
|
|
|
|
|
enrich(
|
|
enrich(
|
|
|
- Shard,
|
|
|
|
|
- #s{trie = Trie, with_guid = WithGuid},
|
|
|
|
|
|
|
+ #ctx{shard = Shard, topic_structure = Structure, s = #s{with_guid = WithGuid}},
|
|
|
DSKey,
|
|
DSKey,
|
|
|
- StaticKey,
|
|
|
|
|
Msg0
|
|
Msg0
|
|
|
) ->
|
|
) ->
|
|
|
- case emqx_ds_lts:reverse_lookup(Trie, StaticKey) of
|
|
|
|
|
- {ok, Structure} ->
|
|
|
|
|
- %% Reconstruct the original topic from the static topic
|
|
|
|
|
- %% index and varying parts:
|
|
|
|
|
- Topic = emqx_topic:join(
|
|
|
|
|
- emqx_ds_lts:decompress_topic(Structure, words(Msg0#message.topic))
|
|
|
|
|
- ),
|
|
|
|
|
- Msg0#message{
|
|
|
|
|
- topic = Topic,
|
|
|
|
|
- id =
|
|
|
|
|
- case WithGuid of
|
|
|
|
|
- true -> Msg0#message.id;
|
|
|
|
|
- false -> fake_guid(Shard, DSKey)
|
|
|
|
|
- end
|
|
|
|
|
- };
|
|
|
|
|
- undefined ->
|
|
|
|
|
- Err = #{
|
|
|
|
|
- msg => "LTS trie missing key",
|
|
|
|
|
- key => StaticKey
|
|
|
|
|
- },
|
|
|
|
|
- throw({unrecoverable, Err})
|
|
|
|
|
- end.
|
|
|
|
|
|
|
+ Topic = emqx_topic:join(emqx_ds_lts:decompress_topic(Structure, words(Msg0#message.topic))),
|
|
|
|
|
+ Msg0#message{
|
|
|
|
|
+ topic = Topic,
|
|
|
|
|
+ id =
|
|
|
|
|
+ case WithGuid of
|
|
|
|
|
+ true -> Msg0#message.id;
|
|
|
|
|
+ false -> fake_guid(Shard, DSKey)
|
|
|
|
|
+ end
|
|
|
|
|
+ }.
|
|
|
|
|
|
|
|
deserialize(
|
|
deserialize(
|
|
|
#s{serialization_schema = SSchema},
|
|
#s{serialization_schema = SSchema},
|
|
@@ -437,15 +437,41 @@ do_init_iterators(S, Static, [], _WildcardLevel) ->
|
|
|
}
|
|
}
|
|
|
].
|
|
].
|
|
|
|
|
|
|
|
-next_loop(Shard, S, It = #it{ts = TS}, Iterators, BatchSize, TMax) ->
|
|
|
|
|
- next_loop(Shard, S, It, Iterators, BatchSize, TMax, {seek, TS}, []).
|
|
|
|
|
|
|
+next_loop(
|
|
|
|
|
+ Shard,
|
|
|
|
|
+ S = #s{trie = Trie},
|
|
|
|
|
+ It = #it{static_index = StaticIdx, ts = TS, compressed_tf = CompressedTF},
|
|
|
|
|
+ Iterators,
|
|
|
|
|
+ BatchSize,
|
|
|
|
|
+ TMax
|
|
|
|
|
+) ->
|
|
|
|
|
+ TopicStructure =
|
|
|
|
|
+ case emqx_ds_lts:reverse_lookup(Trie, StaticIdx) of
|
|
|
|
|
+ {ok, Rev} ->
|
|
|
|
|
+ Rev;
|
|
|
|
|
+ undefined ->
|
|
|
|
|
+ throw(#{
|
|
|
|
|
+ msg => "LTS trie missing key",
|
|
|
|
|
+ key => StaticIdx
|
|
|
|
|
+ })
|
|
|
|
|
+ end,
|
|
|
|
|
+ Ctx = #ctx{
|
|
|
|
|
+ shard = Shard,
|
|
|
|
|
+ s = S,
|
|
|
|
|
+ iters = Iterators,
|
|
|
|
|
+ topic_structure = TopicStructure,
|
|
|
|
|
+ filter = words(CompressedTF),
|
|
|
|
|
+ tmax = TMax
|
|
|
|
|
+ },
|
|
|
|
|
+ next_loop(Ctx, It, BatchSize, {seek, TS}, []).
|
|
|
|
|
|
|
|
-next_loop(_Shard, _S, It, _Iterators, 0, _TMax, Op, Acc) ->
|
|
|
|
|
|
|
+next_loop(_Ctx, It, 0, Op, Acc) ->
|
|
|
finalize_loop(It, Op, Acc);
|
|
finalize_loop(It, Op, Acc);
|
|
|
-next_loop(Shard, S, It0, Iterators, BatchSize, TMax, Op, Acc) ->
|
|
|
|
|
|
|
+next_loop(Ctx, It0, BatchSize, Op, Acc) ->
|
|
|
%% ?tp(notice, skipstream_loop, #{
|
|
%% ?tp(notice, skipstream_loop, #{
|
|
|
%% ts => It0#it.ts, tf => It0#it.compressed_tf, bs => BatchSize, tmax => TMax, op => Op
|
|
%% ts => It0#it.ts, tf => It0#it.compressed_tf, bs => BatchSize, tmax => TMax, op => Op
|
|
|
%% }),
|
|
%% }),
|
|
|
|
|
+ #ctx{s = S, tmax = TMax, iters = Iterators} = Ctx,
|
|
|
#it{static_index = StaticIdx, compressed_tf = CompressedTF} = It0,
|
|
#it{static_index = StaticIdx, compressed_tf = CompressedTF} = It0,
|
|
|
case next_step(S, StaticIdx, CompressedTF, Iterators, undefined, Op) of
|
|
case next_step(S, StaticIdx, CompressedTF, Iterators, undefined, Op) of
|
|
|
none ->
|
|
none ->
|
|
@@ -463,12 +489,12 @@ next_loop(Shard, S, It0, Iterators, BatchSize, TMax, Op, Acc) ->
|
|
|
{seek, TS} ->
|
|
{seek, TS} ->
|
|
|
%% ?tp(notice, skipstream_loop_result, #{r => seek, ts => TS}),
|
|
%% ?tp(notice, skipstream_loop_result, #{r => seek, ts => TS}),
|
|
|
It = It0#it{ts = TS},
|
|
It = It0#it{ts = TS},
|
|
|
- next_loop(Shard, S, It, Iterators, BatchSize, TMax, {seek, TS}, Acc);
|
|
|
|
|
|
|
+ next_loop(Ctx, It, BatchSize, {seek, TS}, Acc);
|
|
|
{ok, TS, DSKey, Msg0} ->
|
|
{ok, TS, DSKey, Msg0} ->
|
|
|
%% ?tp(notice, skipstream_loop_result, #{r => ok, ts => TS, key => Key}),
|
|
%% ?tp(notice, skipstream_loop_result, #{r => ok, ts => TS, key => Key}),
|
|
|
- Message = enrich(Shard, S, DSKey, StaticIdx, Msg0),
|
|
|
|
|
|
|
+ Message = enrich(Ctx, DSKey, Msg0),
|
|
|
It = It0#it{ts = TS},
|
|
It = It0#it{ts = TS},
|
|
|
- next_loop(Shard, S, It, Iterators, BatchSize - 1, TMax, next, [{DSKey, Message} | Acc])
|
|
|
|
|
|
|
+ next_loop(Ctx, It, BatchSize - 1, next, [{DSKey, Message} | Acc])
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
finalize_loop(It0, Op, Acc) ->
|
|
finalize_loop(It0, Op, Acc) ->
|