|
|
@@ -156,7 +156,9 @@
|
|
|
-define(DIM_TOPIC, 1).
|
|
|
-define(DIM_TS, 2).
|
|
|
|
|
|
--define(DS_LTS_COUNTERS, [?DS_LTS_SEEK_COUNTER, ?DS_LTS_NEXT_COUNTER, ?DS_LTS_COLLISION_COUNTER]).
|
|
|
+-define(DS_LTS_COUNTERS, [
|
|
|
+ ?DS_BITFIELD_LTS_SEEK_COUNTER, ?DS_BITFIELD_LTS_NEXT_COUNTER, ?DS_BITFIELD_LTS_COLLISION_COUNTER
|
|
|
+]).
|
|
|
|
|
|
%% GVar used for idle detection:
|
|
|
-define(IDLE_DETECT, idle_detect).
|
|
|
@@ -637,7 +639,7 @@ next_loop(ITHandle, KeyMapper, Filter, Cutoff, It0, Acc0, N0) ->
|
|
|
Key1 ->
|
|
|
%% assert
|
|
|
true = Key1 > Key0,
|
|
|
- inc_counter(?DS_LTS_SEEK_COUNTER),
|
|
|
+ inc_counter(?DS_BITFIELD_LTS_SEEK_COUNTER),
|
|
|
case rocksdb:iterator_move(ITHandle, {seek, Key1}) of
|
|
|
{ok, Key, Val} ->
|
|
|
{N, It, Acc} = traverse_interval(
|
|
|
@@ -663,7 +665,7 @@ traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, Key, Val, It0, Acc0, N) -
|
|
|
Acc = [{Key, Msg} | Acc0],
|
|
|
traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, It, Acc, N - 1);
|
|
|
false ->
|
|
|
- inc_counter(?DS_LTS_COLLISION_COUNTER),
|
|
|
+ inc_counter(?DS_BITFIELD_LTS_COLLISION_COUNTER),
|
|
|
traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, It, Acc0, N)
|
|
|
end;
|
|
|
overflow ->
|
|
|
@@ -675,7 +677,7 @@ traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, Key, Val, It0, Acc0, N) -
|
|
|
traverse_interval(_ITHandle, _KeyMapper, _Filter, _Cutoff, It, Acc, 0) ->
|
|
|
{0, It, Acc};
|
|
|
traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, It, Acc, N) ->
|
|
|
- inc_counter(?DS_LTS_NEXT_COUNTER),
|
|
|
+ inc_counter(?DS_BITFIELD_LTS_NEXT_COUNTER),
|
|
|
case rocksdb:iterator_move(ITHandle, next) of
|
|
|
{ok, Key, Val} ->
|
|
|
traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, Key, Val, It, Acc, N);
|
|
|
@@ -695,7 +697,7 @@ delete_next_loop(LoopContext0) ->
|
|
|
iterated_over := AccIter0,
|
|
|
it_handle := ITHandle
|
|
|
} = LoopContext0,
|
|
|
- inc_counter(?DS_LTS_SEEK_COUNTER),
|
|
|
+ inc_counter(?DS_BITFIELD_LTS_SEEK_COUNTER),
|
|
|
#{?tag := ?DELETE_IT, ?last_seen_key := Key0} = It0,
|
|
|
case emqx_ds_bitmask_keymapper:bin_increment(Filter, Key0) of
|
|
|
overflow ->
|
|
|
@@ -777,7 +779,7 @@ delete_traverse_interval1(LoopContext0) ->
|
|
|
iterated_over := AccIter,
|
|
|
storage_iter := It
|
|
|
} = LoopContext0,
|
|
|
- inc_counter(?DS_LTS_NEXT_COUNTER),
|
|
|
+ inc_counter(?DS_BITFIELD_LTS_NEXT_COUNTER),
|
|
|
case rocksdb:iterator_move(ITHandle, next) of
|
|
|
{ok, Key, Val} ->
|
|
|
delete_traverse_interval(LoopContext0#{
|
|
|
@@ -940,9 +942,11 @@ init_counters() ->
|
|
|
ok.
|
|
|
|
|
|
report_counters(Shard) ->
|
|
|
- emqx_ds_builtin_metrics:inc_lts_seek_counter(Shard, get(?DS_LTS_SEEK_COUNTER)),
|
|
|
- emqx_ds_builtin_metrics:inc_lts_next_counter(Shard, get(?DS_LTS_NEXT_COUNTER)),
|
|
|
- emqx_ds_builtin_metrics:inc_lts_collision_counter(Shard, get(?DS_LTS_COLLISION_COUNTER)),
|
|
|
+ emqx_ds_builtin_metrics:inc_lts_seek_counter(Shard, get(?DS_BITFIELD_LTS_SEEK_COUNTER)),
|
|
|
+ emqx_ds_builtin_metrics:inc_lts_next_counter(Shard, get(?DS_BITFIELD_LTS_NEXT_COUNTER)),
|
|
|
+ emqx_ds_builtin_metrics:inc_lts_collision_counter(
|
|
|
+ Shard, get(?DS_BITFIELD_LTS_COLLISION_COUNTER)
|
|
|
+ ),
|
|
|
_ = [erase(I) || I <- ?DS_LTS_COUNTERS],
|
|
|
ok.
|
|
|
|