|
|
@@ -43,6 +43,7 @@
|
|
|
|
|
|
-include_lib("emqx_utils/include/emqx_message.hrl").
|
|
|
-include_lib("snabbkaffe/include/trace.hrl").
|
|
|
+-include("emqx_ds_metrics.hrl").
|
|
|
|
|
|
-ifdef(TEST).
|
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
|
@@ -260,7 +261,8 @@ update_iterator(_Shard, _Data, OldIter, DSKey) ->
|
|
|
{ok, OldIter#it{ts = TS}}
|
|
|
end.
|
|
|
|
|
|
-next({_DB, Shard}, S, It, BatchSize, TMax, IsCurrent) ->
|
|
|
+next(ShardId = {_DB, Shard}, S, It, BatchSize, TMax, IsCurrent) ->
|
|
|
+ init_counters(),
|
|
|
Iterators = init_iterators(S, It),
|
|
|
%% ?tp(notice, skipstream_init_iters, #{it => It, its => Iterators}),
|
|
|
try
|
|
|
@@ -271,7 +273,8 @@ next({_DB, Shard}, S, It, BatchSize, TMax, IsCurrent) ->
|
|
|
Result
|
|
|
end
|
|
|
after
|
|
|
- free_iterators(Iterators)
|
|
|
+ free_iterators(Iterators),
|
|
|
+ collect_counters(ShardId)
|
|
|
end.
|
|
|
|
|
|
delete_next(Shard, S, It0, Selector, BatchSize, Now, IsCurrent) ->
|
|
|
@@ -447,12 +450,15 @@ next_loop(Shard, S, It0, Iterators, BatchSize, TMax, Op, Acc) ->
|
|
|
case next_step(S, StaticIdx, CompressedTF, Iterators, undefined, Op) of
|
|
|
none ->
|
|
|
%% ?tp(notice, skipstream_loop_result, #{r => none}),
|
|
|
+ inc_counter(?DS_SKIPSTREAM_LTS_EOS),
|
|
|
finalize_loop(It0, Op, Acc);
|
|
|
{seek, TS} when TS > TMax ->
|
|
|
%% ?tp(notice, skipstream_loop_result, #{r => seek_future, ts => TS}),
|
|
|
+ inc_counter(?DS_SKIPSTREAM_LTS_FUTURE),
|
|
|
finalize_loop(It0, {seek, TS}, Acc);
|
|
|
{ok, TS, _Key, _Msg0} when TS > TMax ->
|
|
|
%% ?tp(notice, skipstream_loop_result, #{r => ok_future, ts => TS, key => _Key}),
|
|
|
+ inc_counter(?DS_SKIPSTREAM_LTS_FUTURE),
|
|
|
finalize_loop(It0, {seek, TS}, Acc);
|
|
|
{seek, TS} ->
|
|
|
%% ?tp(notice, skipstream_loop_result, #{r => seek, ts => TS}),
|
|
|
@@ -479,8 +485,10 @@ next_step(
|
|
|
Result =
|
|
|
case Op of
|
|
|
next ->
|
|
|
+ inc_counter(?DS_SKIPSTREAM_LTS_NEXT),
|
|
|
rocksdb:iterator_move(IH, next);
|
|
|
{seek, TS} ->
|
|
|
+ inc_counter(?DS_SKIPSTREAM_LTS_SEEK),
|
|
|
rocksdb:iterator_move(IH, {seek, mk_key(StaticIdx, N, Hash, TS)})
|
|
|
end,
|
|
|
case Result of
|
|
|
@@ -508,10 +516,12 @@ next_step(
|
|
|
Msg0 = deserialize(S, Blob),
|
|
|
case emqx_topic:match(Msg0#message.topic, CompressedTF) of
|
|
|
true ->
|
|
|
+ inc_counter(?DS_SKIPSTREAM_LTS_HIT),
|
|
|
{ok, NextTS, Key, Msg0};
|
|
|
false ->
|
|
|
%% Hash collision. Advance to the
|
|
|
%% next timestamp:
|
|
|
+ inc_counter(?DS_SKIPSTREAM_LTS_HASH_COLLISION),
|
|
|
{seek, NextTS + 1}
|
|
|
end;
|
|
|
_ ->
|
|
|
@@ -519,6 +529,7 @@ next_step(
|
|
|
next_step(S, StaticIdx, CompressedTF, Iterators, NextTS, {seek, NextTS})
|
|
|
end;
|
|
|
NextTS when NextTS > ExpectedTS, N > 0 ->
|
|
|
+ inc_counter(?DS_SKIPSTREAM_LTS_MISS),
|
|
|
%% Next index level is not what we expect.
|
|
|
{seek, NextTS}
|
|
|
end
|
|
|
@@ -672,3 +683,31 @@ words(<<>>) ->
|
|
|
[];
|
|
|
words(Bin) ->
|
|
|
emqx_topic:words(Bin).
|
|
|
+
|
|
|
+%%%%%%%% Counters %%%%%%%%%%
|
|
|
+
|
|
|
+-define(COUNTERS, [
|
|
|
+ ?DS_SKIPSTREAM_LTS_SEEK,
|
|
|
+ ?DS_SKIPSTREAM_LTS_NEXT,
|
|
|
+ ?DS_SKIPSTREAM_LTS_HASH_COLLISION,
|
|
|
+ ?DS_SKIPSTREAM_LTS_HIT,
|
|
|
+ ?DS_SKIPSTREAM_LTS_MISS,
|
|
|
+ ?DS_SKIPSTREAM_LTS_FUTURE,
|
|
|
+ ?DS_SKIPSTREAM_LTS_EOS
|
|
|
+]).
|
|
|
+
|
|
|
+inc_counter(Counter) ->
|
|
|
+ N = get(Counter),
|
|
|
+ put(Counter, N + 1).
|
|
|
+
|
|
|
+init_counters() ->
|
|
|
+ _ = [put(I, 0) || I <- ?COUNTERS],
|
|
|
+ ok.
|
|
|
+
|
|
|
+collect_counters(Shard) ->
|
|
|
+ lists:foreach(
|
|
|
+ fun(Key) ->
|
|
|
+ emqx_ds_builtin_metrics:collect_shard_counter(Shard, Key, get(Key))
|
|
|
+ end,
|
|
|
+ ?COUNTERS
|
|
|
+ ).
|