Просмотр исходного кода

feat(ds): Report counters for LTS storage layout

ieQu1 1 год назад
Родитель
Сommit
94ca7ad0f8

+ 2 - 2
apps/emqx_durable_storage/include/emqx_ds.hrl

@@ -13,7 +13,7 @@
 %% See the License for the specific language governing permissions and
 %% limitations under the License.
 %%--------------------------------------------------------------------
--ifndef(EMQX_DS_HRL_HRL).
--define(EMQX_DS_HRL_HRL, true).
+-ifndef(EMQX_DS_HRL).
+-define(EMQX_DS_HRL, true).
 
 -endif.

+ 49 - 0
apps/emqx_durable_storage/include/emqx_ds_metrics.hrl

@@ -0,0 +1,49 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-ifndef(EMQX_DS_METRICS_HRL).
+-define(EMQX_DS_METRICS_HRL, true).
+
+%%%% Egress metrics:
+
+%% Number of successfully flushed batches:
+-define(DS_EGRESS_BATCHES, emqx_ds_egress_batches).
+%% Number of batch flush retries:
+-define(DS_EGRESS_BATCHES_RETRY, emqx_ds_egress_batches_retry).
+%% Number of batches that weren't flushed due to unrecoverable errors:
+-define(DS_EGRESS_BATCHES_FAILED, emqx_ds_egress_batches_failed).
+%% Total number of messages that were successfully committed to the storage:
+-define(DS_EGRESS_MESSAGES, emqx_ds_egress_messages).
+%% Total size of payloads that were successfully committed to the storage:
+-define(DS_EGRESS_BYTES, emqx_ds_egress_bytes).
+%% Sliding average of flush time (microseconds):
+-define(DS_EGRESS_FLUSH_TIME, emqx_ds_egress_flush_time).
+
+%%%% Storage layer metrics:
+-define(DS_STORE_BATCH_TIME, emqx_ds_store_batch_time).
+-define(DS_BUILTIN_NEXT_TIME, emqx_ds_builtin_next_time).
+
+%%% LTS Storage counters:
+
+%% This counter is incremented when the iterator seeks to the next interval:
+-define(DS_LTS_SEEK_COUNTER, emqx_ds_storage_bitfield_lts_counter_seek).
+%% This counter is incremented when the iterator proceeds to the next
+%% key within the interval (this is is best case scenario):
+-define(DS_LTS_NEXT_COUNTER, emqx_ds_storage_bitfield_lts_counter_next).
+%% This counter is incremented when the key passes bitmask check, but
+%% the value is rejected by the subsequent post-processing:
+-define(DS_LTS_COLLISION_COUNTER, emqx_ds_storage_bitfield_lts_counter_collision).
+
+-endif.

+ 38 - 17
apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl

@@ -32,7 +32,11 @@
 
     observe_store_batch_time/2,
 
-    observe_next_time/2
+    observe_next_time/2,
+
+    inc_lts_seek_counter/2,
+    inc_lts_next_counter/2,
+    inc_lts_collision_counter/2
 ]).
 
 %% behavior callbacks:
@@ -43,6 +47,8 @@
 
 -export_type([shard_metrics_id/0]).
 
+-include("emqx_ds_metrics.hrl").
+
 %%================================================================================
 %% Type declarations
 %%================================================================================
@@ -50,22 +56,25 @@
 -define(WORKER, ?MODULE).
 
 -define(STORAGE_LAYER_METRICS, [
-    {slide, 'emqx_ds_store_batch_time'}
+    {slide, ?DS_STORE_BATCH_TIME},
+    {counter, ?DS_LTS_SEEK_COUNTER},
+    {counter, ?DS_LTS_NEXT_COUNTER},
+    {counter, ?DS_LTS_COLLISION_COUNTER}
 ]).
 
 -define(FETCH_METRICS, [
-    {slide, 'emqx_ds_builtin_next_time'}
+    {slide, ?DS_BUILTIN_NEXT_TIME}
 ]).
 
 -define(DB_METRICS, ?STORAGE_LAYER_METRICS ++ ?FETCH_METRICS).
 
 -define(EGRESS_METRICS, [
-    {counter, 'emqx_ds_egress_batches'},
-    {counter, 'emqx_ds_egress_batches_retry'},
-    {counter, 'emqx_ds_egress_batches_failed'},
-    {counter, 'emqx_ds_egress_messages'},
-    {counter, 'emqx_ds_egress_bytes'},
-    {slide, 'emqx_ds_egress_flush_time'}
+    {counter, ?DS_EGRESS_BATCHES},
+    {counter, ?DS_EGRESS_BATCHES_RETRY},
+    {counter, ?DS_EGRESS_BATCHES_FAILED},
+    {counter, ?DS_EGRESS_MESSAGES},
+    {counter, ?DS_EGRESS_BYTES},
+    {slide, ?DS_EGRESS_FLUSH_TIME}
 ]).
 
 -define(SHARD_METRICS, ?EGRESS_METRICS).
@@ -99,45 +108,57 @@ init_for_shard(ShardId) ->
 %% @doc Increase the number of successfully flushed batches
 -spec inc_egress_batches(shard_metrics_id()) -> ok.
 inc_egress_batches(Id) ->
-    catch emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_batches').
+    catch emqx_metrics_worker:inc(?WORKER, Id, ?DS_EGRESS_BATCHES).
 
 %% @doc Increase the number of time the egress worker had to retry
 %% flushing the batch
 -spec inc_egress_batches_retry(shard_metrics_id()) -> ok.
 inc_egress_batches_retry(Id) ->
-    catch emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_batches_retry').
+    catch emqx_metrics_worker:inc(?WORKER, Id, ?DS_EGRESS_BATCHES_RETRY).
 
 %% @doc Increase the number of time the egress worker encountered an
 %% unrecoverable error while trying to flush the batch
 -spec inc_egress_batches_failed(shard_metrics_id()) -> ok.
 inc_egress_batches_failed(Id) ->
-    catch emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_batches_failed').
+    catch emqx_metrics_worker:inc(?WORKER, Id, ?DS_EGRESS_BATCHES_FAILED).
 
 %% @doc Increase the number of messages successfully saved to the shard
 -spec inc_egress_messages(shard_metrics_id(), non_neg_integer()) -> ok.
 inc_egress_messages(Id, NMessages) ->
-    catch emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_messages', NMessages).
+    catch emqx_metrics_worker:inc(?WORKER, Id, ?DS_EGRESS_MESSAGES, NMessages).
 
 %% @doc Increase the number of messages successfully saved to the shard
 -spec inc_egress_bytes(shard_metrics_id(), non_neg_integer()) -> ok.
 inc_egress_bytes(Id, NMessages) ->
-    catch emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_bytes', NMessages).
+    catch emqx_metrics_worker:inc(?WORKER, Id, ?DS_EGRESS_BYTES, NMessages).
 
 %% @doc Add a sample of elapsed time spent flushing the egress to the
 %% Raft log (in microseconds)
 -spec observe_egress_flush_time(shard_metrics_id(), non_neg_integer()) -> ok.
 observe_egress_flush_time(Id, FlushTime) ->
-    catch emqx_metrics_worker:observe(?WORKER, Id, 'emqx_ds_egress_flush_time', FlushTime).
+    catch emqx_metrics_worker:observe(?WORKER, Id, ?DS_EGRESS_FLUSH_TIME, FlushTime).
 
 -spec observe_store_batch_time(emqx_ds_storage_layer:shard_id(), non_neg_integer()) -> ok.
 observe_store_batch_time({DB, _}, StoreTime) ->
-    catch emqx_metrics_worker:observe(?WORKER, DB, 'emqx_ds_store_batch_time', StoreTime).
+    catch emqx_metrics_worker:observe(?WORKER, DB, ?DS_STORE_BATCH_TIME, StoreTime).
 
 %% @doc Add a sample of elapsed time spent waiting for a batch
 %% `emqx_ds_replication_layer:next'
 -spec observe_next_time(emqx_ds:db(), non_neg_integer()) -> ok.
 observe_next_time(DB, NextTime) ->
-    catch emqx_metrics_worker:observe(?WORKER, DB, 'emqx_ds_builtin_next_time', NextTime).
+    catch emqx_metrics_worker:observe(?WORKER, DB, ?DS_BUILTIN_NEXT_TIME, NextTime).
+
+-spec inc_lts_seek_counter(emqx_ds_storage_layer:shard_id(), non_neg_integer()) -> ok.
+inc_lts_seek_counter({DB, _}, Inc) ->
+    catch emqx_metrics_worker:inc(?WORKER, DB, ?DS_LTS_SEEK_COUNTER, Inc).
+
+-spec inc_lts_next_counter(emqx_ds_storage_layer:shard_id(), non_neg_integer()) -> ok.
+inc_lts_next_counter({DB, _}, Inc) ->
+    catch emqx_metrics_worker:inc(?WORKER, DB, ?DS_LTS_NEXT_COUNTER, Inc).
+
+-spec inc_lts_collision_counter(emqx_ds_storage_layer:shard_id(), non_neg_integer()) -> ok.
+inc_lts_collision_counter({DB, _}, Inc) ->
+    catch emqx_metrics_worker:inc(?WORKER, DB, ?DS_LTS_COLLISION_COUNTER, Inc).
 
 prometheus_meta() ->
     lists:map(

+ 38 - 19
apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl

@@ -44,6 +44,7 @@
 
 -export_type([options/0]).
 
+-include("emqx_ds_metrics.hrl").
 -include_lib("emqx_utils/include/emqx_message.hrl").
 -include_lib("snabbkaffe/include/trace.hrl").
 
@@ -115,8 +116,6 @@
         ?last_seen_key := binary()
     }.
 
--define(COUNTER, emqx_ds_storage_bitfield_lts_counter).
-
 %% Limit on the number of wildcard levels in the learned topic trie:
 -define(WILDCARD_LIMIT, 10).
 
@@ -140,6 +139,8 @@
 -define(DIM_TOPIC, 1).
 -define(DIM_TS, 2).
 
+-define(DS_LTS_COUNTERS, [?DS_LTS_SEEK_COUNTER, ?DS_LTS_NEXT_COUNTER, ?DS_LTS_COLLISION_COUNTER]).
+
 -ifdef(TEST).
 -include_lib("eunit/include/eunit.hrl").
 -endif.
@@ -347,13 +348,18 @@ update_iterator(
 ) ->
     {ok, OldIter#{?last_seen_key => DSKey}}.
 
-next(_Shard, Schema = #s{ts_offset = TSOffset}, It, BatchSize) ->
+next(Shard, Schema = #s{ts_offset = TSOffset}, It, BatchSize) ->
     %% Compute safe cutoff time.
     %% It's the point in time where the last complete epoch ends, so we need to know
     %% the current time to compute it.
+    init_counters(),
     Now = emqx_ds:timestamp_us(),
     SafeCutoffTime = (Now bsr TSOffset) bsl TSOffset,
-    next_until(Schema, It, SafeCutoffTime, BatchSize).
+    try
+        next_until(Schema, It, SafeCutoffTime, BatchSize)
+    after
+        report_counters(Shard)
+    end.
 
 next_until(_Schema, It = #{?tag := ?IT, ?start_time := StartTime}, SafeCutoffTime, _BatchSize) when
     StartTime >= SafeCutoffTime
@@ -375,20 +381,23 @@ next_until(#s{db = DB, data = CF, keymappers = Keymappers}, It, SafeCutoffTime,
         filter := Filter
     } = prepare_loop_context(DB, CF, TopicIndex, StartTime, SafeCutoffTime, Varying, Keymappers),
     try
-        put(?COUNTER, 0),
         next_loop(ITHandle, Keymapper, Filter, SafeCutoffTime, It, [], BatchSize)
     after
-        rocksdb:iterator_close(ITHandle),
-        erase(?COUNTER)
+        rocksdb:iterator_close(ITHandle)
     end.
 
-delete_next(_Shard, Schema = #s{ts_offset = TSOffset}, It, Selector, BatchSize) ->
+delete_next(Shard, Schema = #s{ts_offset = TSOffset}, It, Selector, BatchSize) ->
     %% Compute safe cutoff time.
     %% It's the point in time where the last complete epoch ends, so we need to know
     %% the current time to compute it.
+    init_counters(),
     Now = emqx_message:timestamp_now(),
     SafeCutoffTime = (Now bsr TSOffset) bsl TSOffset,
-    delete_next_until(Schema, It, SafeCutoffTime, Selector, BatchSize).
+    try
+        delete_next_until(Schema, It, SafeCutoffTime, Selector, BatchSize)
+    after
+        report_counters(Shard)
+    end.
 
 delete_next_until(
     _Schema,
@@ -417,7 +426,6 @@ delete_next_until(
             DB, CF, TopicIndex, StartTime, SafeCutoffTime, Varying, Keymappers
         ),
     try
-        put(?COUNTER, 0),
         LoopContext = LoopContext0#{
             db => DB,
             cf => CF,
@@ -430,8 +438,7 @@ delete_next_until(
         },
         delete_next_loop(LoopContext)
     after
-        rocksdb:iterator_close(ITHandle),
-        erase(?COUNTER)
+        rocksdb:iterator_close(ITHandle)
     end.
 
 %%================================================================================
@@ -477,7 +484,6 @@ prepare_loop_context(DB, CF, TopicIndex, StartTime, SafeCutoffTime, Varying, Key
 next_loop(_ITHandle, _KeyMapper, _Filter, _Cutoff, It, Acc, 0) ->
     {ok, It, lists:reverse(Acc)};
 next_loop(ITHandle, KeyMapper, Filter, Cutoff, It0, Acc0, N0) ->
-    inc_counter(),
     #{?tag := ?IT, ?last_seen_key := Key0} = It0,
     case emqx_ds_bitmask_keymapper:bin_increment(Filter, Key0) of
         overflow ->
@@ -485,6 +491,7 @@ next_loop(ITHandle, KeyMapper, Filter, Cutoff, It0, Acc0, N0) ->
         Key1 ->
             %% assert
             true = Key1 > Key0,
+            inc_counter(?DS_LTS_SEEK_COUNTER),
             case rocksdb:iterator_move(ITHandle, {seek, Key1}) of
                 {ok, Key, Val} ->
                     {N, It, Acc} = traverse_interval(
@@ -510,6 +517,7 @@ traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, Key, Val, It0, Acc0, N) -
                     Acc = [{Key, Msg} | Acc0],
                     traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, It, Acc, N - 1);
                 false ->
+                    inc_counter(?DS_LTS_COLLISION_COUNTER),
                     traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, It, Acc0, N)
             end;
         overflow ->
@@ -521,7 +529,7 @@ traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, Key, Val, It0, Acc0, N) -
 traverse_interval(_ITHandle, _KeyMapper, _Filter, _Cutoff, It, Acc, 0) ->
     {0, It, Acc};
 traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, It, Acc, N) ->
-    inc_counter(),
+    inc_counter(?DS_LTS_NEXT_COUNTER),
     case rocksdb:iterator_move(ITHandle, next) of
         {ok, Key, Val} ->
             traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, Key, Val, It, Acc, N);
@@ -541,7 +549,7 @@ delete_next_loop(LoopContext0) ->
         iterated_over := AccIter0,
         it_handle := ITHandle
     } = LoopContext0,
-    inc_counter(),
+    inc_counter(?DS_LTS_SEEK_COUNTER),
     #{?tag := ?DELETE_IT, ?last_seen_key := Key0} = It0,
     case emqx_ds_bitmask_keymapper:bin_increment(Filter, Key0) of
         overflow ->
@@ -623,7 +631,7 @@ delete_traverse_interval1(LoopContext0) ->
         iterated_over := AccIter,
         storage_iter := It
     } = LoopContext0,
-    inc_counter(),
+    inc_counter(?DS_LTS_NEXT_COUNTER),
     case rocksdb:iterator_move(ITHandle, next) of
         {ok, Key, Val} ->
             delete_traverse_interval(LoopContext0#{
@@ -767,9 +775,20 @@ read_persisted_trie(IT, {ok, KeyB, ValB}) ->
 read_persisted_trie(_IT, {error, invalid_iterator}) ->
     [].
 
-inc_counter() ->
-    N = get(?COUNTER),
-    put(?COUNTER, N + 1).
+inc_counter(Counter) ->
+    N = get(Counter),
+    put(Counter, N + 1).
+
+init_counters() ->
+    _ = [put(I, 0) || I <- ?DS_LTS_COUNTERS],
+    ok.
+
+report_counters(Shard) ->
+    emqx_ds_builtin_metrics:inc_lts_seek_counter(Shard, get(?DS_LTS_SEEK_COUNTER)),
+    emqx_ds_builtin_metrics:inc_lts_next_counter(Shard, get(?DS_LTS_NEXT_COUNTER)),
+    emqx_ds_builtin_metrics:inc_lts_collision_counter(Shard, get(?DS_LTS_COLLISION_COUNTER)),
+    _ = [erase(I) || I <- ?DS_LTS_COUNTERS],
+    ok.
 
 %% @doc Generate a column family ID for the MQTT messages
 -spec data_cf(emqx_ds_storage_layer:gen_id()) -> [char()].

+ 1 - 1
apps/emqx_durable_storage/test/emqx_ds_SUITE.erl

@@ -677,7 +677,7 @@ t_error_mapping_replication_layer(_Config) ->
     ),
     meck:unload().
 
-%% This test suite verifies the behavior of `store_batch' operation
+%% This testcase verifies the behavior of `store_batch' operation
 %% when the underlying code experiences recoverable or unrecoverable
 %% problems.
 t_store_batch_fail(_Config) ->

+ 10 - 12
apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl

@@ -261,8 +261,7 @@ t_atomic_store_batch(_Config) ->
                     sync => true
                 })
             ),
-
-            ok
+            timer:sleep(1000)
         end,
         fun(Trace) ->
             %% Must contain exactly one flush with all messages.
@@ -293,19 +292,18 @@ t_non_atomic_store_batch(_Config) ->
                     sync => true
                 })
             ),
-
-            ok
+            Msgs
         end,
-        fun(Trace) ->
-            %% Should contain one flush per message.
-            ?assertMatch(
-                [#{batch := [_]}, #{batch := [_]}, #{batch := [_]}],
-                ?of_kind(emqx_ds_replication_layer_egress_flush, Trace)
+        fun(ExpectedMsgs, Trace) ->
+            ProcessedMsgs = lists:append(
+                ?projection(batch, ?of_kind(emqx_ds_replication_layer_egress_flush, Trace))
             ),
-            ok
+            ?assertEqual(
+                ExpectedMsgs,
+                ProcessedMsgs
+            )
         end
-    ),
-    ok.
+    ).
 
 check(Shard, TopicFilter, StartTime, ExpectedMessages) ->
     ExpectedFiltered = lists:filter(

+ 12 - 8
apps/emqx_prometheus/src/emqx_prometheus.erl

@@ -37,6 +37,7 @@
 -include_lib("public_key/include/public_key.hrl").
 -include_lib("prometheus/include/prometheus_model.hrl").
 -include_lib("emqx/include/logger.hrl").
+-include_lib("emqx_durable_storage/include/emqx_ds_metrics.hrl").
 
 -import(
     prometheus_model_helpers,
@@ -501,14 +502,17 @@ emqx_collect(K = emqx_mria_bootstrap_num_keys, D) -> gauge_metrics(?MG(K, D, [])
 emqx_collect(K = emqx_mria_message_queue_len, D) -> gauge_metrics(?MG(K, D, []));
 emqx_collect(K = emqx_mria_replayq_len, D) -> gauge_metrics(?MG(K, D, []));
 %% DS
-emqx_collect(K = emqx_ds_egress_batches, D) -> counter_metrics(?MG(K, D, []));
-emqx_collect(K = emqx_ds_egress_batches_retry, D) -> counter_metrics(?MG(K, D, []));
-emqx_collect(K = emqx_ds_egress_batches_failed, D) -> counter_metrics(?MG(K, D, []));
-emqx_collect(K = emqx_ds_egress_messages, D) -> counter_metrics(?MG(K, D, []));
-emqx_collect(K = emqx_ds_egress_bytes, D) -> counter_metrics(?MG(K, D, []));
-emqx_collect(K = emqx_ds_egress_flush_time, D) -> gauge_metrics(?MG(K, D, []));
-emqx_collect(K = emqx_ds_store_batch_time, D) -> gauge_metrics(?MG(K, D, []));
-emqx_collect(K = emqx_ds_builtin_next_time, D) -> gauge_metrics(?MG(K, D, [])).
+emqx_collect(K = ?DS_EGRESS_BATCHES, D) -> counter_metrics(?MG(K, D, []));
+emqx_collect(K = ?DS_EGRESS_BATCHES_RETRY, D) -> counter_metrics(?MG(K, D, []));
+emqx_collect(K = ?DS_EGRESS_BATCHES_FAILED, D) -> counter_metrics(?MG(K, D, []));
+emqx_collect(K = ?DS_EGRESS_MESSAGES, D) -> counter_metrics(?MG(K, D, []));
+emqx_collect(K = ?DS_EGRESS_BYTES, D) -> counter_metrics(?MG(K, D, []));
+emqx_collect(K = ?DS_EGRESS_FLUSH_TIME, D) -> gauge_metrics(?MG(K, D, []));
+emqx_collect(K = ?DS_STORE_BATCH_TIME, D) -> gauge_metrics(?MG(K, D, []));
+emqx_collect(K = ?DS_BUILTIN_NEXT_TIME, D) -> gauge_metrics(?MG(K, D, []));
+emqx_collect(K = ?DS_LTS_SEEK_COUNTER, D) -> counter_metrics(?MG(K, D, []));
+emqx_collect(K = ?DS_LTS_NEXT_COUNTER, D) -> counter_metrics(?MG(K, D, []));
+emqx_collect(K = ?DS_LTS_COLLISION_COUNTER, D) -> counter_metrics(?MG(K, D, [])).
 
 %%--------------------------------------------------------------------
 %% Indicators