Kaynağa Gözat

Merge pull request #12781 from ieQu1/dev/rocksdb-metrics

Add metrics for the builtin durable storage.
ieQu1 1 yıl önce
ebeveyn
işleme
a0ad4fa35c

+ 6 - 2
apps/emqx/src/emqx_broker.erl

@@ -253,8 +253,12 @@ persist_publish(Msg) ->
     case emqx_persistent_message:persist(Msg) of
         ok ->
             [persisted];
-        {_SkipOrError, _Reason} ->
-            % TODO: log errors?
+        {skipped, _} ->
+            [];
+        {error, Recoverable, Reason} ->
+            ?SLOG(debug, #{
+                msg => "failed_to_persist_message", is_recoverable => Recoverable, reason => Reason
+            }),
             []
     end.
 

+ 4 - 1
apps/emqx/src/emqx_metrics.erl

@@ -222,7 +222,9 @@
         % Messages delivered
         {counter, 'messages.delivered'},
         % Messages acked
-        {counter, 'messages.acked'}
+        {counter, 'messages.acked'},
+        % Messages persistently stored
+        {counter, 'messages.persisted'}
     ]
 ).
 
@@ -718,4 +720,5 @@ reserved_idx('overload_protection.gc') -> 403;
 reserved_idx('overload_protection.new_conn') -> 404;
 reserved_idx('messages.validation_succeeded') -> 405;
 reserved_idx('messages.validation_failed') -> 406;
+reserved_idx('messages.persisted') -> 407;
 reserved_idx(_) -> undefined.

+ 2 - 1
apps/emqx/src/emqx_persistent_message.erl

@@ -98,7 +98,7 @@ pre_config_update(_Root, _NewConf, _OldConf) ->
 %%--------------------------------------------------------------------
 
 -spec persist(emqx_types:message()) ->
-    ok | {skipped, _Reason} | {error, _TODO}.
+    emqx_ds:store_batch_result() | {skipped, needs_no_persistence}.
 persist(Msg) ->
     ?WHEN_ENABLED(
         case needs_persistence(Msg) andalso has_subscribers(Msg) of
@@ -114,6 +114,7 @@ needs_persistence(Msg) ->
 
 -spec store_message(emqx_types:message()) -> emqx_ds:store_batch_result().
 store_message(Msg) ->
+    emqx_metrics:inc('messages.persisted'),
     emqx_ds:store_batch(?PERSISTENT_MESSAGE_DB, [Msg], #{sync => false}).
 
 has_subscribers(#message{topic = Topic}) ->

+ 4 - 2
apps/emqx_dashboard/include/emqx_dashboard.hrl

@@ -67,7 +67,8 @@
     %, sent_bytes
     validation_succeeded,
     validation_failed,
-    dropped
+    dropped,
+    persisted
 ]).
 
 -define(GAUGE_SAMPLER_LIST, [
@@ -87,7 +88,8 @@
     sent => sent_msg_rate,
     validation_succeeded => validation_succeeded_rate,
     validation_failed => validation_failed_rate,
-    dropped => dropped_msg_rate
+    dropped => dropped_msg_rate,
+    persisted => persisted_rate
 }).
 
 -define(CURRENT_SAMPLE_NON_RATE,

+ 2 - 1
apps/emqx_dashboard/src/emqx_dashboard_monitor.erl

@@ -428,7 +428,8 @@ stats(sent) -> emqx_metrics:val('messages.sent');
 stats(sent_bytes) -> emqx_metrics:val('bytes.sent');
 stats(validation_succeeded) -> emqx_metrics:val('messages.validation_succeeded');
 stats(validation_failed) -> emqx_metrics:val('messages.validation_failed');
-stats(dropped) -> emqx_metrics:val('messages.dropped').
+stats(dropped) -> emqx_metrics:val('messages.dropped');
+stats(persisted) -> emqx_metrics:val('messages.persisted').
 
 %% -------------------------------------------------------------------------------------------------
 %% Retained && License Quota

+ 4 - 0
apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl

@@ -192,6 +192,8 @@ swagger_desc(validation_succeeded) ->
     swagger_desc_format("Message validations succeeded ");
 swagger_desc(validation_failed) ->
     swagger_desc_format("Message validations failed ");
+swagger_desc(persisted) ->
+    swagger_desc_format("Messages saved to the durable storage ");
 swagger_desc(subscriptions) ->
     <<"Subscriptions at the time of sampling.", ?APPROXIMATE_DESC>>;
 swagger_desc(topics) ->
@@ -218,6 +220,8 @@ swagger_desc(validation_succeeded_rate) ->
     swagger_desc_format("Message validations succeeded ", per);
 swagger_desc(validation_failed_rate) ->
     swagger_desc_format("Message validations failed ", per);
+swagger_desc(persisted_rate) ->
+    swagger_desc_format("Messages saved to the durable storage ", per);
 swagger_desc(retained_msg_count) ->
     <<"Retained messages count at the time of sampling.", ?APPROXIMATE_DESC>>;
 swagger_desc(shared_subscriptions) ->

+ 2 - 2
apps/emqx_durable_storage/include/emqx_ds.hrl

@@ -13,7 +13,7 @@
 %% See the License for the specific language governing permissions and
 %% limitations under the License.
 %%--------------------------------------------------------------------
--ifndef(EMQX_DS_HRL_HRL).
--define(EMQX_DS_HRL_HRL, true).
+-ifndef(EMQX_DS_HRL).
+-define(EMQX_DS_HRL, true).
 
 -endif.

+ 49 - 0
apps/emqx_durable_storage/include/emqx_ds_metrics.hrl

@@ -0,0 +1,49 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-ifndef(EMQX_DS_METRICS_HRL).
+-define(EMQX_DS_METRICS_HRL, true).
+
+%%%% Egress metrics:
+
+%% Number of successfully flushed batches:
+-define(DS_EGRESS_BATCHES, emqx_ds_egress_batches).
+%% Number of batch flush retries:
+-define(DS_EGRESS_BATCHES_RETRY, emqx_ds_egress_batches_retry).
+%% Number of batches that weren't flushed due to unrecoverable errors:
+-define(DS_EGRESS_BATCHES_FAILED, emqx_ds_egress_batches_failed).
+%% Total number of messages that were successfully committed to the storage:
+-define(DS_EGRESS_MESSAGES, emqx_ds_egress_messages).
+%% Total size of payloads that were successfully committed to the storage:
+-define(DS_EGRESS_BYTES, emqx_ds_egress_bytes).
+%% Sliding average of flush time (microseconds):
+-define(DS_EGRESS_FLUSH_TIME, emqx_ds_egress_flush_time).
+
+%%%% Storage layer metrics:
+-define(DS_STORE_BATCH_TIME, emqx_ds_store_batch_time).
+-define(DS_BUILTIN_NEXT_TIME, emqx_ds_builtin_next_time).
+
+%%% LTS Storage counters:
+
+%% This counter is incremented when the iterator seeks to the next interval:
+-define(DS_LTS_SEEK_COUNTER, emqx_ds_storage_bitfield_lts_counter_seek).
+%% This counter is incremented when the iterator proceeds to the next
+%% key within the interval (this is is best case scenario):
+-define(DS_LTS_NEXT_COUNTER, emqx_ds_storage_bitfield_lts_counter_next).
+%% This counter is incremented when the key passes bitmask check, but
+%% the value is rejected by the subsequent post-processing:
+-define(DS_LTS_COLLISION_COUNTER, emqx_ds_storage_bitfield_lts_counter_collision).
+
+-endif.

+ 9 - 1
apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl

@@ -31,7 +31,7 @@
     ensure_shard/1,
     ensure_egress/1
 ]).
--export([which_shards/1]).
+-export([which_dbs/0, which_shards/1]).
 
 %% behaviour callbacks:
 -export([init/1]).
@@ -104,6 +104,13 @@ ensure_egress(Shard) ->
 which_shards(DB) ->
     supervisor:which_children(?via(#?shards_sup{db = DB})).
 
+%% @doc Return the list of builtin DS databases that are currently
+%% active on the node.
+-spec which_dbs() -> [emqx_ds:db()].
+which_dbs() ->
+    Key = {n, l, #?db_sup{_ = '_', db = '$1'}},
+    gproc:select({local, names}, [{{Key, '_', '_'}, [], ['$1']}]).
+
 %%================================================================================
 %% behaviour callbacks
 %%================================================================================
@@ -111,6 +118,7 @@ which_shards(DB) ->
 init({#?db_sup{db = DB}, DefaultOpts}) ->
     %% Spec for the top-level supervisor for the database:
     logger:notice("Starting DS DB ~p", [DB]),
+    emqx_ds_builtin_metrics:init_for_db(DB),
     Opts = emqx_ds_replication_layer_meta:open_db(DB, DefaultOpts),
     ok = start_ra_system(DB, Opts),
     Children = [

+ 299 - 0
apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl

@@ -0,0 +1,299 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_ds_builtin_metrics).
+
+%% DS-facing API:
+-export([child_spec/0, init_for_db/1, shard_metric_id/2, init_for_shard/1]).
+
+%% Prometheus-facing API:
+-export([prometheus_meta/0, prometheus_collect/1]).
+
+-export([
+    inc_egress_batches/1,
+    inc_egress_batches_retry/1,
+    inc_egress_batches_failed/1,
+    inc_egress_messages/2,
+    inc_egress_bytes/2,
+
+    observe_egress_flush_time/2,
+
+    observe_store_batch_time/2,
+
+    observe_next_time/2,
+
+    inc_lts_seek_counter/2,
+    inc_lts_next_counter/2,
+    inc_lts_collision_counter/2
+]).
+
+%% behavior callbacks:
+-export([]).
+
+%% internal exports:
+-export([]).
+
+-export_type([shard_metrics_id/0]).
+
+-include("emqx_ds_metrics.hrl").
+
+%%================================================================================
+%% Type declarations
+%%================================================================================
+
+-define(WORKER, ?MODULE).
+
+-define(STORAGE_LAYER_METRICS, [
+    {slide, ?DS_STORE_BATCH_TIME},
+    {counter, ?DS_LTS_SEEK_COUNTER},
+    {counter, ?DS_LTS_NEXT_COUNTER},
+    {counter, ?DS_LTS_COLLISION_COUNTER}
+]).
+
+-define(FETCH_METRICS, [
+    {slide, ?DS_BUILTIN_NEXT_TIME}
+]).
+
+-define(DB_METRICS, ?STORAGE_LAYER_METRICS ++ ?FETCH_METRICS).
+
+-define(EGRESS_METRICS, [
+    {counter, ?DS_EGRESS_BATCHES},
+    {counter, ?DS_EGRESS_BATCHES_RETRY},
+    {counter, ?DS_EGRESS_BATCHES_FAILED},
+    {counter, ?DS_EGRESS_MESSAGES},
+    {counter, ?DS_EGRESS_BYTES},
+    {slide, ?DS_EGRESS_FLUSH_TIME}
+]).
+
+-define(SHARD_METRICS, ?EGRESS_METRICS).
+
+-type shard_metrics_id() :: binary().
+
+-elvis([{elvis_style, dont_repeat_yourself, disable}]).
+
+%%================================================================================
+%% API functions
+%%================================================================================
+
+-spec child_spec() -> supervisor:child_spec().
+child_spec() ->
+    emqx_metrics_worker:child_spec(?WORKER).
+
+%% @doc Initialize metrics that are global for a DS database
+-spec init_for_db(emqx_ds:db()) -> ok.
+init_for_db(DB) ->
+    emqx_metrics_worker:create_metrics(?WORKER, DB, ?DB_METRICS, []).
+
+-spec shard_metric_id(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> shard_metrics_id().
+shard_metric_id(DB, ShardId) ->
+    iolist_to_binary([atom_to_list(DB), $/, ShardId]).
+
+%% @doc Initialize metrics that are specific for the shard.
+-spec init_for_shard(shard_metrics_id()) -> ok.
+init_for_shard(ShardId) ->
+    emqx_metrics_worker:create_metrics(?WORKER, ShardId, ?SHARD_METRICS, []).
+
+%% @doc Increase the number of successfully flushed batches
+-spec inc_egress_batches(shard_metrics_id()) -> ok.
+inc_egress_batches(Id) ->
+    catch emqx_metrics_worker:inc(?WORKER, Id, ?DS_EGRESS_BATCHES).
+
+%% @doc Increase the number of time the egress worker had to retry
+%% flushing the batch
+-spec inc_egress_batches_retry(shard_metrics_id()) -> ok.
+inc_egress_batches_retry(Id) ->
+    catch emqx_metrics_worker:inc(?WORKER, Id, ?DS_EGRESS_BATCHES_RETRY).
+
+%% @doc Increase the number of time the egress worker encountered an
+%% unrecoverable error while trying to flush the batch
+-spec inc_egress_batches_failed(shard_metrics_id()) -> ok.
+inc_egress_batches_failed(Id) ->
+    catch emqx_metrics_worker:inc(?WORKER, Id, ?DS_EGRESS_BATCHES_FAILED).
+
+%% @doc Increase the number of messages successfully saved to the shard
+-spec inc_egress_messages(shard_metrics_id(), non_neg_integer()) -> ok.
+inc_egress_messages(Id, NMessages) ->
+    catch emqx_metrics_worker:inc(?WORKER, Id, ?DS_EGRESS_MESSAGES, NMessages).
+
+%% @doc Increase the number of messages successfully saved to the shard
+-spec inc_egress_bytes(shard_metrics_id(), non_neg_integer()) -> ok.
+inc_egress_bytes(Id, NMessages) ->
+    catch emqx_metrics_worker:inc(?WORKER, Id, ?DS_EGRESS_BYTES, NMessages).
+
+%% @doc Add a sample of elapsed time spent flushing the egress to the
+%% Raft log (in microseconds)
+-spec observe_egress_flush_time(shard_metrics_id(), non_neg_integer()) -> ok.
+observe_egress_flush_time(Id, FlushTime) ->
+    catch emqx_metrics_worker:observe(?WORKER, Id, ?DS_EGRESS_FLUSH_TIME, FlushTime).
+
+-spec observe_store_batch_time(emqx_ds_storage_layer:shard_id(), non_neg_integer()) -> ok.
+observe_store_batch_time({DB, _}, StoreTime) ->
+    catch emqx_metrics_worker:observe(?WORKER, DB, ?DS_STORE_BATCH_TIME, StoreTime).
+
+%% @doc Add a sample of elapsed time spent waiting for a batch
+%% `emqx_ds_replication_layer:next'
+-spec observe_next_time(emqx_ds:db(), non_neg_integer()) -> ok.
+observe_next_time(DB, NextTime) ->
+    catch emqx_metrics_worker:observe(?WORKER, DB, ?DS_BUILTIN_NEXT_TIME, NextTime).
+
+-spec inc_lts_seek_counter(emqx_ds_storage_layer:shard_id(), non_neg_integer()) -> ok.
+inc_lts_seek_counter({DB, _}, Inc) ->
+    catch emqx_metrics_worker:inc(?WORKER, DB, ?DS_LTS_SEEK_COUNTER, Inc).
+
+-spec inc_lts_next_counter(emqx_ds_storage_layer:shard_id(), non_neg_integer()) -> ok.
+inc_lts_next_counter({DB, _}, Inc) ->
+    catch emqx_metrics_worker:inc(?WORKER, DB, ?DS_LTS_NEXT_COUNTER, Inc).
+
+-spec inc_lts_collision_counter(emqx_ds_storage_layer:shard_id(), non_neg_integer()) -> ok.
+inc_lts_collision_counter({DB, _}, Inc) ->
+    catch emqx_metrics_worker:inc(?WORKER, DB, ?DS_LTS_COLLISION_COUNTER, Inc).
+
+prometheus_meta() ->
+    lists:map(
+        fun
+            ({counter, A}) ->
+                {A, counter, A};
+            ({slide, A}) ->
+                {A, counter, A}
+        end,
+        ?DB_METRICS ++ ?SHARD_METRICS
+    ).
+
+prometheus_collect(NodeOrAggr) ->
+    maps:merge(prometheus_per_db(NodeOrAggr), prometheus_per_shard(NodeOrAggr)).
+
+prometheus_per_db(NodeOrAggr) ->
+    lists:foldl(
+        fun(DB, Acc) ->
+            prometheus_per_db(NodeOrAggr, DB, Acc)
+        end,
+        #{},
+        emqx_ds_builtin_db_sup:which_dbs()
+    ).
+
+%% This function returns the data in the following format:
+%% ```
+%% #{emqx_ds_store_batch_time =>
+%%     [{[{db, emqx_persistent_message}], 42}],
+%%  ...
+%% '''
+%%
+%% If `NodeOrAggr' = `node' then node name is appended to the list of
+%% labels.
+prometheus_per_db(NodeOrAggr, DB, Acc0) ->
+    Labels = [
+        {db, DB}
+        | case NodeOrAggr of
+            node -> [];
+            _ -> [{node, node()}]
+        end
+    ],
+    #{counters := CC, slides := SS} = emqx_metrics_worker:get_metrics(?WORKER, DB),
+    %% Collect counters:
+    Acc1 = maps:fold(
+        fun(MetricId, Value, Acc1) ->
+            append_to_key(MetricId, {Labels, Value}, Acc1)
+        end,
+        Acc0,
+        CC
+    ),
+    %% Collect slides:
+    maps:fold(
+        fun(MetricId, Value, Acc2) ->
+            Acc3 = append_to_key(MetricId, slide_value(current, Value, Labels), Acc2),
+            append_to_key(MetricId, slide_value(last5m, Value, Labels), Acc3)
+        end,
+        Acc1,
+        SS
+    ).
+
+%% This function returns the data in the following format:
+%% ```
+%% #{emqx_ds_egress_batches =>
+%%       [{[{db,emqx_persistent_message},{shard,<<"1">>}],99408},
+%%        {[{db,emqx_persistent_message},{shard,<<"0">>}],99409}],
+%%   emqx_ds_egress_batches_retry =>
+%%       [{[{db,emqx_persistent_message},{shard,<<"1">>}],0},
+%%        {[{db,emqx_persistent_message},{shard,<<"0">>}],0}],
+%%   emqx_ds_egress_messages =>
+%%        ...
+%%  }
+%% '''
+%%
+%% If `NodeOrAggr' = `node' then node name is appended to the list of
+%% labels.
+prometheus_per_shard(NodeOrAggr) ->
+    lists:foldl(
+        fun(DB, Acc0) ->
+            lists:foldl(
+                fun(Shard, Acc) ->
+                    prometheus_per_shard(NodeOrAggr, DB, Shard, Acc)
+                end,
+                Acc0,
+                emqx_ds_replication_layer_meta:shards(DB)
+            )
+        end,
+        #{},
+        emqx_ds_builtin_db_sup:which_dbs()
+    ).
+
+prometheus_per_shard(NodeOrAggr, DB, Shard, Acc0) ->
+    Labels = [
+        {db, DB},
+        {shard, Shard}
+        | case NodeOrAggr of
+            node -> [];
+            _ -> [{node, node()}]
+        end
+    ],
+    #{counters := CC, slides := SS} = emqx_metrics_worker:get_metrics(
+        ?WORKER, shard_metric_id(DB, Shard)
+    ),
+    %% Collect counters:
+    Acc1 = maps:fold(
+        fun(MetricId, Value, Acc1) ->
+            append_to_key(MetricId, {Labels, Value}, Acc1)
+        end,
+        Acc0,
+        CC
+    ),
+    %% Collect slides:
+    maps:fold(
+        fun(MetricId, Value, Acc2) ->
+            Acc3 = append_to_key(MetricId, slide_value(current, Value, Labels), Acc2),
+            append_to_key(MetricId, slide_value(last5m, Value, Labels), Acc3)
+        end,
+        Acc1,
+        SS
+    ).
+
+-spec append_to_key(K, V, #{K => [V]}) -> #{K => [V]}.
+append_to_key(Key, Value, Map) ->
+    maps:update_with(
+        Key,
+        fun(L) ->
+            [Value | L]
+        end,
+        [Value],
+        Map
+    ).
+
+slide_value(Interval, Value, Labels0) ->
+    Labels = [{interval, Interval} | Labels0],
+    {Labels, maps:get(Interval, Value, 0)}.
+
+%%================================================================================
+%% Internal functions
+%%================================================================================

+ 2 - 1
apps/emqx_durable_storage/src/emqx_ds_builtin_sup.erl

@@ -81,6 +81,7 @@ stop_db(DB) ->
 %% Chidren are attached dynamically to this one.
 init(?top) ->
     %% Children:
+    MetricsWorker = emqx_ds_builtin_metrics:child_spec(),
     MetadataServer = #{
         id => metadata_server,
         start => {emqx_ds_replication_layer_meta, start_link, []},
@@ -102,7 +103,7 @@ init(?top) ->
         period => 1,
         auto_shutdown => never
     },
-    {ok, {SupFlags, [MetadataServer, DBsSup]}};
+    {ok, {SupFlags, [MetricsWorker, MetadataServer, DBsSup]}};
 init(?databases) ->
     %% Children are added dynamically:
     SupFlags = #{

+ 7 - 1
apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl

@@ -329,7 +329,11 @@ next(DB, Iter0, BatchSize) ->
     %%
     %% This kind of trickery should be probably done here in the
     %% replication layer. Or, perhaps, in the logic layer.
-    case ra_next(DB, Shard, StorageIter0, BatchSize) of
+    T0 = erlang:monotonic_time(microsecond),
+    Result = ra_next(DB, Shard, StorageIter0, BatchSize),
+    T1 = erlang:monotonic_time(microsecond),
+    emqx_ds_builtin_metrics:observe_next_time(DB, T1 - T0),
+    case Result of
         {ok, StorageIter, Batch} ->
             Iter = Iter0#{?enc := StorageIter},
             {ok, Iter, Batch};
@@ -547,6 +551,8 @@ list_nodes() ->
     end
 ).
 
+-spec ra_store_batch(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), [emqx_types:message()]) ->
+    ok | {timeout, _} | {error, recoverable | unrecoverable, _Err} | _Err.
 ra_store_batch(DB, Shard, Messages) ->
     Command = #{
         ?tag => ?BATCH,

+ 265 - 106
apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl

@@ -40,6 +40,7 @@
 
 -export_type([]).
 
+-include_lib("emqx_utils/include/emqx_message.hrl").
 -include_lib("snabbkaffe/include/trace.hrl").
 
 %%================================================================================
@@ -49,8 +50,13 @@
 -define(via(DB, Shard), {via, gproc, {n, l, {?MODULE, DB, Shard}}}).
 -define(flush, flush).
 
--record(enqueue_req, {message :: emqx_types:message(), sync :: boolean()}).
--record(enqueue_atomic_req, {batch :: [emqx_types:message()], sync :: boolean()}).
+-record(enqueue_req, {
+    messages :: [emqx_types:message()],
+    sync :: boolean(),
+    atomic :: boolean(),
+    n_messages :: non_neg_integer(),
+    payload_bytes :: non_neg_integer()
+}).
 
 %%================================================================================
 %% API functions
@@ -61,44 +67,32 @@ start_link(DB, Shard) ->
     gen_server:start_link(?via(DB, Shard), ?MODULE, [DB, Shard], []).
 
 -spec store_batch(emqx_ds:db(), [emqx_types:message()], emqx_ds:message_store_opts()) ->
-    ok.
+    emqx_ds:store_batch_result().
 store_batch(DB, Messages, Opts) ->
     Sync = maps:get(sync, Opts, true),
-    case maps:get(atomic, Opts, false) of
-        false ->
-            lists:foreach(
-                fun(Message) ->
-                    Shard = emqx_ds_replication_layer:shard_of_message(DB, Message, clientid),
-                    gen_server:call(
-                        ?via(DB, Shard),
-                        #enqueue_req{
-                            message = Message,
-                            sync = Sync
-                        },
-                        infinity
-                    )
-                end,
-                Messages
+    Atomic = maps:get(atomic, Opts, false),
+    %% Usually we expect all messages in the batch to go into the
+    %% single shard, so this function is optimized for the happy case.
+    case shards_of_batch(DB, Messages) of
+        [{Shard, {NMsgs, NBytes}}] ->
+            %% Happy case:
+            enqueue_call_or_cast(
+                ?via(DB, Shard),
+                #enqueue_req{
+                    messages = Messages,
+                    sync = Sync,
+                    atomic = Atomic,
+                    n_messages = NMsgs,
+                    payload_bytes = NBytes
+                }
             );
-        true ->
-            maps:foreach(
-                fun(Shard, Batch) ->
-                    gen_server:call(
-                        ?via(DB, Shard),
-                        #enqueue_atomic_req{
-                            batch = Batch,
-                            sync = Sync
-                        },
-                        infinity
-                    )
-                end,
-                maps:groups_from_list(
-                    fun(Message) ->
-                        emqx_ds_replication_layer:shard_of_message(DB, Message, clientid)
-                    end,
-                    Messages
-                )
-            )
+        [_, _ | _] when Atomic ->
+            %% It's impossible to commit a batch to multiple shards
+            %% atomically
+            {error, unrecoverable, atomic_commit_to_multiple_shards};
+        _Shards ->
+            %% Use a slower implementation for the unlikely case:
+            repackage_messages(DB, Messages, Sync)
     end.
 
 %%================================================================================
@@ -108,35 +102,65 @@ store_batch(DB, Messages, Opts) ->
 -record(s, {
     db :: emqx_ds:db(),
     shard :: emqx_ds_replication_layer:shard_id(),
+    metrics_id :: emqx_ds_builtin_metrics:shard_metrics_id(),
+    n_retries = 0 :: non_neg_integer(),
+    %% FIXME: Currently max_retries is always 0, because replication
+    %% layer doesn't guarantee idempotency. Retrying would create
+    %% duplicate messages.
+    max_retries = 0 :: non_neg_integer(),
     n = 0 :: non_neg_integer(),
-    tref :: reference(),
-    batch = [] :: [emqx_types:message()],
+    n_bytes = 0 :: non_neg_integer(),
+    tref :: undefined | reference(),
+    queue :: queue:queue(emqx_types:message()),
     pending_replies = [] :: [gen_server:from()]
 }).
 
 init([DB, Shard]) ->
     process_flag(trap_exit, true),
     process_flag(message_queue_data, off_heap),
+    logger:update_process_metadata(#{domain => [emqx, ds, egress, DB]}),
+    MetricsId = emqx_ds_builtin_metrics:shard_metric_id(DB, Shard),
+    ok = emqx_ds_builtin_metrics:init_for_shard(MetricsId),
     S = #s{
         db = DB,
         shard = Shard,
-        tref = start_timer()
+        metrics_id = MetricsId,
+        queue = queue:new()
     },
-    {ok, S}.
+    {ok, start_timer(S)}.
 
-handle_call(#enqueue_req{message = Msg, sync = Sync}, From, S) ->
-    do_enqueue(From, Sync, Msg, S);
-handle_call(#enqueue_atomic_req{batch = Batch, sync = Sync}, From, S) ->
-    Len = length(Batch),
-    do_enqueue(From, Sync, {atomic, Len, Batch}, S);
+handle_call(
+    #enqueue_req{
+        messages = Msgs,
+        sync = Sync,
+        atomic = Atomic,
+        n_messages = NMsgs,
+        payload_bytes = NBytes
+    },
+    From,
+    S0 = #s{pending_replies = Replies0}
+) ->
+    S = S0#s{pending_replies = [From | Replies0]},
+    {noreply, enqueue(Sync, Atomic, Msgs, NMsgs, NBytes, S)};
 handle_call(_Call, _From, S) ->
     {reply, {error, unknown_call}, S}.
 
+handle_cast(
+    #enqueue_req{
+        messages = Msgs,
+        sync = Sync,
+        atomic = Atomic,
+        n_messages = NMsgs,
+        payload_bytes = NBytes
+    },
+    S
+) ->
+    {noreply, enqueue(Sync, Atomic, Msgs, NMsgs, NBytes, S)};
 handle_cast(_Cast, S) ->
     {noreply, S}.
 
 handle_info(?flush, S) ->
-    {noreply, do_flush(S)};
+    {noreply, flush(S)};
 handle_info(_Info, S) ->
     {noreply, S}.
 
@@ -151,80 +175,215 @@ terminate(_Reason, _S) ->
 %% Internal functions
 %%================================================================================
 
+enqueue(
+    Sync,
+    Atomic,
+    Msgs,
+    BatchSize,
+    BatchBytes,
+    S0 = #s{n = NMsgs0, n_bytes = NBytes0, queue = Q0}
+) ->
+    %% At this point we don't split the batches, even when they aren't
+    %% atomic. It wouldn't win us anything in terms of memory, and
+    %% EMQX currently feeds data to DS in very small batches, so
+    %% granularity should be fine enough.
+    NMax = application:get_env(emqx_durable_storage, egress_batch_size, 1000),
+    NBytesMax = application:get_env(emqx_durable_storage, egress_batch_bytes, infinity),
+    NMsgs = NMsgs0 + BatchSize,
+    NBytes = NBytes0 + BatchBytes,
+    case (NMsgs >= NMax orelse NBytes >= NBytesMax) andalso (NMsgs0 > 0) of
+        true ->
+            %% Adding this batch would cause buffer to overflow. Flush
+            %% it now, and retry:
+            cancel_timer(S0),
+            S1 = flush(S0),
+            enqueue(Sync, Atomic, Msgs, BatchSize, BatchBytes, S1);
+        false ->
+            %% The buffer is empty, we enqueue the atomic batch in its
+            %% entirety:
+            Q1 = lists:foldl(fun queue:in/2, Q0, Msgs),
+            S1 = S0#s{n = NMsgs, n_bytes = NBytes, queue = Q1},
+            case NMsgs >= NMax orelse NBytes >= NBytes of
+                true ->
+                    cancel_timer(S1),
+                    flush(S1);
+                false ->
+                    S1
+            end
+    end.
+
 -define(COOLDOWN_MIN, 1000).
 -define(COOLDOWN_MAX, 5000).
 
-do_flush(S = #s{batch = []}) ->
-    S#s{tref = start_timer()};
+flush(S) ->
+    start_timer(do_flush(S)).
+
+do_flush(S0 = #s{n = 0}) ->
+    S0;
 do_flush(
-    S = #s{batch = Messages, pending_replies = Replies, db = DB, shard = Shard}
+    S = #s{
+        queue = Q,
+        pending_replies = Replies,
+        db = DB,
+        shard = Shard,
+        metrics_id = Metrics,
+        n_retries = Retries,
+        max_retries = MaxRetries
+    }
 ) ->
-    case emqx_ds_replication_layer:ra_store_batch(DB, Shard, lists:reverse(Messages)) of
+    Messages = queue:to_list(Q),
+    T0 = erlang:monotonic_time(microsecond),
+    Result = emqx_ds_replication_layer:ra_store_batch(DB, Shard, Messages),
+    T1 = erlang:monotonic_time(microsecond),
+    emqx_ds_builtin_metrics:observe_egress_flush_time(Metrics, T1 - T0),
+    case Result of
         ok ->
+            emqx_ds_builtin_metrics:inc_egress_batches(Metrics),
+            emqx_ds_builtin_metrics:inc_egress_messages(Metrics, S#s.n),
+            emqx_ds_builtin_metrics:inc_egress_bytes(Metrics, S#s.n_bytes),
             ?tp(
                 emqx_ds_replication_layer_egress_flush,
                 #{db => DB, shard => Shard, batch => Messages}
             ),
             lists:foreach(fun(From) -> gen_server:reply(From, ok) end, Replies),
-            true = erlang:garbage_collect(),
-            ok;
-        Error ->
-            true = erlang:garbage_collect(),
+            erlang:garbage_collect(),
+            S#s{
+                n = 0,
+                n_bytes = 0,
+                queue = queue:new(),
+                pending_replies = []
+            };
+        {timeout, ServerId} when Retries < MaxRetries ->
+            %% Note: this is a hot loop, so we report error messages
+            %% with `debug' level to avoid wiping the logs. Instead,
+            %% error the detection must rely on the metrics. Debug
+            %% logging can be enabled for the particular egress server
+            %% via logger domain.
+            ?tp(
+                debug,
+                emqx_ds_replication_layer_egress_flush_retry,
+                #{db => DB, shard => Shard, reason => timeout, server_id => ServerId}
+            ),
+            %% Retry sending the batch:
+            emqx_ds_builtin_metrics:inc_egress_batches_retry(Metrics),
+            erlang:garbage_collect(),
+            %% We block the gen_server until the next retry.
+            BlockTime = ?COOLDOWN_MIN + rand:uniform(?COOLDOWN_MAX - ?COOLDOWN_MIN),
+            timer:sleep(BlockTime),
+            S#s{n_retries = Retries + 1};
+        Err ->
             ?tp(
-                warning,
+                debug,
                 emqx_ds_replication_layer_egress_flush_failed,
-                #{db => DB, shard => Shard, reason => Error}
+                #{db => DB, shard => Shard, error => Err}
             ),
-            Cooldown = ?COOLDOWN_MIN + rand:uniform(?COOLDOWN_MAX - ?COOLDOWN_MIN),
-            ok = timer:sleep(Cooldown),
-            %% Since we drop the entire batch here, we at least reply callers with an
-            %% error so they don't hang indefinitely in the `gen_server' call with
-            %% `infinity' timeout.
-            lists:foreach(fun(From) -> gen_server:reply(From, {error, Error}) end, Replies)
-    end,
-    S#s{
-        n = 0,
-        batch = [],
-        pending_replies = [],
-        tref = start_timer()
-    }.
-
-do_enqueue(From, Sync, MsgOrBatch, S0 = #s{n = N, batch = Batch, pending_replies = Replies}) ->
-    NMax = application:get_env(emqx_durable_storage, egress_batch_size, 1000),
-    S1 =
-        case MsgOrBatch of
-            {atomic, NumMsgs, Msgs} ->
-                S0#s{n = N + NumMsgs, batch = Msgs ++ Batch};
-            Msg ->
-                S0#s{n = N + 1, batch = [Msg | Batch]}
-        end,
-    %% TODO: later we may want to delay the reply until the message is
-    %% replicated, but it requies changes to the PUBACK/PUBREC flow to
-    %% allow for async replies. For now, we ack when the message is
-    %% _buffered_ rather than stored.
-    %%
-    %% Otherwise, the client would freeze for at least flush interval,
-    %% or until the buffer is filled.
-    S2 =
-        case Sync of
-            true ->
-                S1#s{pending_replies = [From | Replies]};
-            false ->
-                gen_server:reply(From, ok),
-                S1
+            emqx_ds_builtin_metrics:inc_egress_batches_failed(Metrics),
+            Reply =
+                case Err of
+                    {error, _, _} -> Err;
+                    {timeout, ServerId} -> {error, recoverable, {timeout, ServerId}};
+                    _ -> {error, unrecoverable, Err}
+                end,
+            lists:foreach(
+                fun(From) -> gen_server:reply(From, Reply) end, Replies
+            ),
+            erlang:garbage_collect(),
+            S#s{
+                n = 0,
+                n_bytes = 0,
+                queue = queue:new(),
+                pending_replies = [],
+                n_retries = 0
+            }
+    end.
+
+-spec shards_of_batch(emqx_ds:db(), [emqx_types:message()]) ->
+    [{emqx_ds_replication_layer:shard_id(), {NMessages, NBytes}}]
+when
+    NMessages :: non_neg_integer(),
+    NBytes :: non_neg_integer().
+shards_of_batch(DB, Messages) ->
+    maps:to_list(
+        lists:foldl(
+            fun(Message, Acc) ->
+                %% TODO: sharding strategy must be part of the DS DB schema:
+                Shard = emqx_ds_replication_layer:shard_of_message(DB, Message, clientid),
+                Size = payload_size(Message),
+                maps:update_with(
+                    Shard,
+                    fun({N, S}) ->
+                        {N + 1, S + Size}
+                    end,
+                    {1, Size},
+                    Acc
+                )
+            end,
+            #{},
+            Messages
+        )
+    ).
+
+repackage_messages(DB, Messages, Sync) ->
+    Batches = lists:foldl(
+        fun(Message, Acc) ->
+            Shard = emqx_ds_replication_layer:shard_of_message(DB, Message, clientid),
+            Size = payload_size(Message),
+            maps:update_with(
+                Shard,
+                fun({N, S, Msgs}) ->
+                    {N + 1, S + Size, [Message | Msgs]}
+                end,
+                {1, Size, [Message]},
+                Acc
+            )
         end,
-    S =
-        case N >= NMax of
-            true ->
-                _ = erlang:cancel_timer(S2#s.tref),
-                do_flush(S2);
-            false ->
-                S2
+        #{},
+        Messages
+    ),
+    maps:fold(
+        fun(Shard, {NMsgs, ByteSize, RevMessages}, ErrAcc) ->
+            Err = enqueue_call_or_cast(
+                ?via(DB, Shard),
+                #enqueue_req{
+                    messages = lists:reverse(RevMessages),
+                    sync = Sync,
+                    atomic = false,
+                    n_messages = NMsgs,
+                    payload_bytes = ByteSize
+                }
+            ),
+            compose_errors(ErrAcc, Err)
         end,
-    %% TODO: add a backpressure mechanism for the server to avoid
-    %% building a long message queue.
-    {noreply, S}.
+        ok,
+        Batches
+    ).
 
-start_timer() ->
+enqueue_call_or_cast(To, Req = #enqueue_req{sync = true}) ->
+    gen_server:call(To, Req, infinity);
+enqueue_call_or_cast(To, Req = #enqueue_req{sync = false}) ->
+    gen_server:cast(To, Req).
+
+compose_errors(ErrAcc, ok) ->
+    ErrAcc;
+compose_errors(ok, Err) ->
+    Err;
+compose_errors({error, recoverable, _}, {error, unrecoverable, Err}) ->
+    {error, unrecoverable, Err};
+compose_errors(ErrAcc, _Err) ->
+    ErrAcc.
+
+start_timer(S) ->
     Interval = application:get_env(emqx_durable_storage, egress_flush_interval, 100),
-    erlang:send_after(Interval, self(), ?flush).
+    Tref = erlang:send_after(Interval, self(), ?flush),
+    S#s{tref = Tref}.
+
+cancel_timer(#s{tref = undefined}) ->
+    ok;
+cancel_timer(#s{tref = TRef}) ->
+    _ = erlang:cancel_timer(TRef),
+    ok.
+
+%% @doc Return approximate size of the MQTT message (it doesn't take
+%% all things into account, for example headers and extras)
+payload_size(#message{payload = P, topic = T}) ->
+    size(P) + size(T).

+ 38 - 19
apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl

@@ -44,6 +44,7 @@
 
 -export_type([options/0]).
 
+-include("emqx_ds_metrics.hrl").
 -include_lib("emqx_utils/include/emqx_message.hrl").
 -include_lib("snabbkaffe/include/trace.hrl").
 
@@ -115,8 +116,6 @@
         ?last_seen_key := binary()
     }.
 
--define(COUNTER, emqx_ds_storage_bitfield_lts_counter).
-
 %% Limit on the number of wildcard levels in the learned topic trie:
 -define(WILDCARD_LIMIT, 10).
 
@@ -140,6 +139,8 @@
 -define(DIM_TOPIC, 1).
 -define(DIM_TS, 2).
 
+-define(DS_LTS_COUNTERS, [?DS_LTS_SEEK_COUNTER, ?DS_LTS_NEXT_COUNTER, ?DS_LTS_COLLISION_COUNTER]).
+
 -ifdef(TEST).
 -include_lib("eunit/include/eunit.hrl").
 -endif.
@@ -347,13 +348,18 @@ update_iterator(
 ) ->
     {ok, OldIter#{?last_seen_key => DSKey}}.
 
-next(_Shard, Schema = #s{ts_offset = TSOffset}, It, BatchSize) ->
+next(Shard, Schema = #s{ts_offset = TSOffset}, It, BatchSize) ->
     %% Compute safe cutoff time.
     %% It's the point in time where the last complete epoch ends, so we need to know
     %% the current time to compute it.
+    init_counters(),
     Now = emqx_ds:timestamp_us(),
     SafeCutoffTime = (Now bsr TSOffset) bsl TSOffset,
-    next_until(Schema, It, SafeCutoffTime, BatchSize).
+    try
+        next_until(Schema, It, SafeCutoffTime, BatchSize)
+    after
+        report_counters(Shard)
+    end.
 
 next_until(_Schema, It = #{?tag := ?IT, ?start_time := StartTime}, SafeCutoffTime, _BatchSize) when
     StartTime >= SafeCutoffTime
@@ -375,20 +381,23 @@ next_until(#s{db = DB, data = CF, keymappers = Keymappers}, It, SafeCutoffTime,
         filter := Filter
     } = prepare_loop_context(DB, CF, TopicIndex, StartTime, SafeCutoffTime, Varying, Keymappers),
     try
-        put(?COUNTER, 0),
         next_loop(ITHandle, Keymapper, Filter, SafeCutoffTime, It, [], BatchSize)
     after
-        rocksdb:iterator_close(ITHandle),
-        erase(?COUNTER)
+        rocksdb:iterator_close(ITHandle)
     end.
 
-delete_next(_Shard, Schema = #s{ts_offset = TSOffset}, It, Selector, BatchSize) ->
+delete_next(Shard, Schema = #s{ts_offset = TSOffset}, It, Selector, BatchSize) ->
     %% Compute safe cutoff time.
     %% It's the point in time where the last complete epoch ends, so we need to know
     %% the current time to compute it.
+    init_counters(),
     Now = emqx_message:timestamp_now(),
     SafeCutoffTime = (Now bsr TSOffset) bsl TSOffset,
-    delete_next_until(Schema, It, SafeCutoffTime, Selector, BatchSize).
+    try
+        delete_next_until(Schema, It, SafeCutoffTime, Selector, BatchSize)
+    after
+        report_counters(Shard)
+    end.
 
 delete_next_until(
     _Schema,
@@ -417,7 +426,6 @@ delete_next_until(
             DB, CF, TopicIndex, StartTime, SafeCutoffTime, Varying, Keymappers
         ),
     try
-        put(?COUNTER, 0),
         LoopContext = LoopContext0#{
             db => DB,
             cf => CF,
@@ -430,8 +438,7 @@ delete_next_until(
         },
         delete_next_loop(LoopContext)
     after
-        rocksdb:iterator_close(ITHandle),
-        erase(?COUNTER)
+        rocksdb:iterator_close(ITHandle)
     end.
 
 %%================================================================================
@@ -477,7 +484,6 @@ prepare_loop_context(DB, CF, TopicIndex, StartTime, SafeCutoffTime, Varying, Key
 next_loop(_ITHandle, _KeyMapper, _Filter, _Cutoff, It, Acc, 0) ->
     {ok, It, lists:reverse(Acc)};
 next_loop(ITHandle, KeyMapper, Filter, Cutoff, It0, Acc0, N0) ->
-    inc_counter(),
     #{?tag := ?IT, ?last_seen_key := Key0} = It0,
     case emqx_ds_bitmask_keymapper:bin_increment(Filter, Key0) of
         overflow ->
@@ -485,6 +491,7 @@ next_loop(ITHandle, KeyMapper, Filter, Cutoff, It0, Acc0, N0) ->
         Key1 ->
             %% assert
             true = Key1 > Key0,
+            inc_counter(?DS_LTS_SEEK_COUNTER),
             case rocksdb:iterator_move(ITHandle, {seek, Key1}) of
                 {ok, Key, Val} ->
                     {N, It, Acc} = traverse_interval(
@@ -510,6 +517,7 @@ traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, Key, Val, It0, Acc0, N) -
                     Acc = [{Key, Msg} | Acc0],
                     traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, It, Acc, N - 1);
                 false ->
+                    inc_counter(?DS_LTS_COLLISION_COUNTER),
                     traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, It, Acc0, N)
             end;
         overflow ->
@@ -521,7 +529,7 @@ traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, Key, Val, It0, Acc0, N) -
 traverse_interval(_ITHandle, _KeyMapper, _Filter, _Cutoff, It, Acc, 0) ->
     {0, It, Acc};
 traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, It, Acc, N) ->
-    inc_counter(),
+    inc_counter(?DS_LTS_NEXT_COUNTER),
     case rocksdb:iterator_move(ITHandle, next) of
         {ok, Key, Val} ->
             traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, Key, Val, It, Acc, N);
@@ -541,7 +549,7 @@ delete_next_loop(LoopContext0) ->
         iterated_over := AccIter0,
         it_handle := ITHandle
     } = LoopContext0,
-    inc_counter(),
+    inc_counter(?DS_LTS_SEEK_COUNTER),
     #{?tag := ?DELETE_IT, ?last_seen_key := Key0} = It0,
     case emqx_ds_bitmask_keymapper:bin_increment(Filter, Key0) of
         overflow ->
@@ -623,7 +631,7 @@ delete_traverse_interval1(LoopContext0) ->
         iterated_over := AccIter,
         storage_iter := It
     } = LoopContext0,
-    inc_counter(),
+    inc_counter(?DS_LTS_NEXT_COUNTER),
     case rocksdb:iterator_move(ITHandle, next) of
         {ok, Key, Val} ->
             delete_traverse_interval(LoopContext0#{
@@ -767,9 +775,20 @@ read_persisted_trie(IT, {ok, KeyB, ValB}) ->
 read_persisted_trie(_IT, {error, invalid_iterator}) ->
     [].
 
-inc_counter() ->
-    N = get(?COUNTER),
-    put(?COUNTER, N + 1).
+inc_counter(Counter) ->
+    N = get(Counter),
+    put(Counter, N + 1).
+
+init_counters() ->
+    _ = [put(I, 0) || I <- ?DS_LTS_COUNTERS],
+    ok.
+
+report_counters(Shard) ->
+    emqx_ds_builtin_metrics:inc_lts_seek_counter(Shard, get(?DS_LTS_SEEK_COUNTER)),
+    emqx_ds_builtin_metrics:inc_lts_next_counter(Shard, get(?DS_LTS_NEXT_COUNTER)),
+    emqx_ds_builtin_metrics:inc_lts_collision_counter(Shard, get(?DS_LTS_COLLISION_COUNTER)),
+    _ = [erase(I) || I <- ?DS_LTS_COUNTERS],
+    ok.
 
 %% @doc Generate a column family ID for the MQTT messages
 -spec data_cf(emqx_ds_storage_layer:gen_id()) -> [char()].

+ 8 - 1
apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl

@@ -254,8 +254,15 @@ drop_shard(Shard) ->
 store_batch(Shard, Messages = [{Time, _Msg} | _], Options) ->
     %% NOTE
     %% We assume that batches do not span generations. Callers should enforce this.
+    ?tp(emqx_ds_storage_layer_store_batch, #{
+        shard => Shard, messages => Messages, options => Options
+    }),
     #{module := Mod, data := GenData} = generation_at(Shard, Time),
-    Mod:store_batch(Shard, GenData, Messages, Options);
+    T0 = erlang:monotonic_time(microsecond),
+    Result = Mod:store_batch(Shard, GenData, Messages, Options),
+    T1 = erlang:monotonic_time(microsecond),
+    emqx_ds_builtin_metrics:observe_store_batch_time(Shard, T1 - T0),
+    Result;
 store_batch(_Shard, [], _Options) ->
     ok.
 

+ 84 - 16
apps/emqx_durable_storage/test/emqx_ds_SUITE.erl

@@ -322,17 +322,10 @@ t_09_atomic_store_batch(_Config) ->
                     sync => true
                 })
             ),
-
-            ok
+            {ok, Flush} = ?block_until(#{?snk_kind := emqx_ds_replication_layer_egress_flush}),
+            ?assertMatch(#{batch := [_, _, _]}, Flush)
         end,
-        fun(Trace) ->
-            %% Must contain exactly one flush with all messages.
-            ?assertMatch(
-                [#{batch := [_, _, _]}],
-                ?of_kind(emqx_ds_replication_layer_egress_flush, Trace)
-            ),
-            ok
-        end
+        []
     ),
     ok.
 
@@ -355,14 +348,15 @@ t_10_non_atomic_store_batch(_Config) ->
                     sync => true
                 })
             ),
-
-            ok
+            timer:sleep(1000)
         end,
         fun(Trace) ->
             %% Should contain one flush per message.
+            Batches = ?projection(batch, ?of_kind(emqx_ds_replication_layer_egress_flush, Trace)),
+            ?assertMatch([_], Batches),
             ?assertMatch(
-                [#{batch := [_]}, #{batch := [_]}, #{batch := [_]}],
-                ?of_kind(emqx_ds_replication_layer_egress_flush, Trace)
+                [_, _, _],
+                lists:append(Batches)
             ),
             ok
         end
@@ -681,10 +675,83 @@ t_error_mapping_replication_layer(_Config) ->
         length([error || {error, _, _} <- Results2]) > 0,
         Results2
     ),
-
-    snabbkaffe:stop(),
     meck:unload().
 
+%% This testcase verifies the behavior of `store_batch' operation
+%% when the underlying code experiences recoverable or unrecoverable
+%% problems.
+t_store_batch_fail(_Config) ->
+    ?check_trace(
+        #{timetrap => 15_000},
+        try
+            meck:new(emqx_ds_storage_layer, [passthrough, no_history]),
+            DB = ?FUNCTION_NAME,
+            ?assertMatch(ok, emqx_ds:open_db(DB, (opts())#{n_shards => 2})),
+            %% Success:
+            Batch1 = [
+                message(<<"C1">>, <<"foo/bar">>, <<"1">>, 1),
+                message(<<"C1">>, <<"foo/bar">>, <<"2">>, 1)
+            ],
+            ?assertMatch(ok, emqx_ds:store_batch(DB, Batch1, #{sync => true})),
+            %% Inject unrecoverable error:
+            meck:expect(emqx_ds_storage_layer, store_batch, fun(_DB, _Shard, _Messages) ->
+                {error, unrecoverable, mock}
+            end),
+            Batch2 = [
+                message(<<"C1">>, <<"foo/bar">>, <<"3">>, 1),
+                message(<<"C1">>, <<"foo/bar">>, <<"4">>, 1)
+            ],
+            ?assertMatch(
+                {error, unrecoverable, mock}, emqx_ds:store_batch(DB, Batch2, #{sync => true})
+            ),
+            meck:unload(emqx_ds_storage_layer),
+            %% Inject a recoveralbe error:
+            meck:new(ra, [passthrough, no_history]),
+            meck:expect(ra, process_command, fun(Servers, Shard, Command) ->
+                ?tp(ra_command, #{servers => Servers, shard => Shard, command => Command}),
+                {timeout, mock}
+            end),
+            Batch3 = [
+                message(<<"C1">>, <<"foo/bar">>, <<"5">>, 2),
+                message(<<"C2">>, <<"foo/bar">>, <<"6">>, 2),
+                message(<<"C1">>, <<"foo/bar">>, <<"7">>, 3),
+                message(<<"C2">>, <<"foo/bar">>, <<"8">>, 3)
+            ],
+            %% Note: due to idempotency issues the number of retries
+            %% is currently set to 0:
+            ?assertMatch(
+                {error, recoverable, {timeout, mock}},
+                emqx_ds:store_batch(DB, Batch3, #{sync => true})
+            ),
+            meck:unload(ra),
+            ?assertMatch(ok, emqx_ds:store_batch(DB, Batch3, #{sync => true})),
+            lists:sort(emqx_ds_test_helpers:consume_per_stream(DB, ['#'], 1))
+        after
+            meck:unload()
+        end,
+        [
+            {"message ordering", fun(StoredMessages, _Trace) ->
+                [{_, Stream1}, {_, Stream2}] = StoredMessages,
+                ?assertMatch(
+                    [
+                        #message{payload = <<"1">>},
+                        #message{payload = <<"2">>},
+                        #message{payload = <<"5">>},
+                        #message{payload = <<"7">>}
+                    ],
+                    Stream1
+                ),
+                ?assertMatch(
+                    [
+                        #message{payload = <<"6">>},
+                        #message{payload = <<"8">>}
+                    ],
+                    Stream2
+                )
+            end}
+        ]
+    ).
+
 update_data_set() ->
     [
         [
@@ -748,6 +815,7 @@ init_per_testcase(_TC, Config) ->
     Config.
 
 end_per_testcase(_TC, _Config) ->
+    snabbkaffe:stop(),
     ok = application:stop(emqx_durable_storage),
     mria:stop(),
     _ = mnesia:delete_schema([node()]),

+ 10 - 12
apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl

@@ -261,8 +261,7 @@ t_atomic_store_batch(_Config) ->
                     sync => true
                 })
             ),
-
-            ok
+            timer:sleep(1000)
         end,
         fun(Trace) ->
             %% Must contain exactly one flush with all messages.
@@ -293,19 +292,18 @@ t_non_atomic_store_batch(_Config) ->
                     sync => true
                 })
             ),
-
-            ok
+            Msgs
         end,
-        fun(Trace) ->
-            %% Should contain one flush per message.
-            ?assertMatch(
-                [#{batch := [_]}, #{batch := [_]}, #{batch := [_]}],
-                ?of_kind(emqx_ds_replication_layer_egress_flush, Trace)
+        fun(ExpectedMsgs, Trace) ->
+            ProcessedMsgs = lists:append(
+                ?projection(batch, ?of_kind(emqx_ds_replication_layer_egress_flush, Trace))
             ),
-            ok
+            ?assertEqual(
+                ExpectedMsgs,
+                ProcessedMsgs
+            )
         end
-    ),
-    ok.
+    ).
 
 check(Shard, TopicFilter, StartTime, ExpectedMessages) ->
     ExpectedFiltered = lists:filter(

+ 10 - 2
apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl

@@ -63,9 +63,17 @@ consume(DB, TopicFilter) ->
     consume(DB, TopicFilter, 0).
 
 consume(DB, TopicFilter, StartTime) ->
-    Streams = emqx_ds:get_streams(DB, TopicFilter, StartTime),
     lists:flatmap(
-        fun({_Rank, Stream}) -> consume_stream(DB, Stream, TopicFilter, StartTime) end,
+        fun({_Stream, Msgs}) ->
+            Msgs
+        end,
+        consume_per_stream(DB, TopicFilter, StartTime)
+    ).
+
+consume_per_stream(DB, TopicFilter, StartTime) ->
+    Streams = emqx_ds:get_streams(DB, TopicFilter, StartTime),
+    lists:map(
+        fun({_Rank, Stream}) -> {Stream, consume_stream(DB, Stream, TopicFilter, StartTime)} end,
         Streams
     ).
 

+ 34 - 2
apps/emqx_prometheus/src/emqx_prometheus.erl

@@ -37,6 +37,7 @@
 -include_lib("public_key/include/public_key.hrl").
 -include_lib("prometheus/include/prometheus_model.hrl").
 -include_lib("emqx/include/logger.hrl").
+-include_lib("emqx_durable_storage/include/emqx_ds_metrics.hrl").
 
 -import(
     prometheus_model_helpers,
@@ -212,11 +213,30 @@ collect_mf(?PROMETHEUS_DEFAULT_REGISTRY, Callback) ->
 
     ok = add_collect_family(Callback, cert_metric_meta(), ?MG(cert_data, RawData)),
     ok = add_collect_family(Callback, mria_metric_meta(), ?MG(mria_data, RawData)),
+    ok = maybe_add_ds_collect_family(Callback, RawData),
     ok = maybe_license_add_collect_family(Callback, RawData),
     ok;
 collect_mf(_Registry, _Callback) ->
     ok.
 
+maybe_add_ds_collect_family(Callback, RawData) ->
+    case emqx_persistent_message:is_persistence_enabled() of
+        true ->
+            add_collect_family(
+                Callback, emqx_ds_builtin_metrics:prometheus_meta(), ?MG(ds_data, RawData)
+            );
+        false ->
+            ok
+    end.
+
+maybe_collect_ds_data(Mode) ->
+    case emqx_persistent_message:is_persistence_enabled() of
+        true ->
+            #{ds_data => emqx_ds_builtin_metrics:prometheus_collect(Mode)};
+        false ->
+            #{}
+    end.
+
 %% @private
 collect(<<"json">>) ->
     RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()),
@@ -251,7 +271,7 @@ add_collect_family(Name, Data, Callback, Type) ->
 
 %% behaviour
 fetch_from_local_node(Mode) ->
-    {node(), #{
+    {node(), (maybe_collect_ds_data(Mode))#{
         stats_data => stats_data(Mode),
         vm_data => vm_data(Mode),
         cluster_data => cluster_data(Mode),
@@ -480,7 +500,19 @@ emqx_collect(K = emqx_mria_lag, D) -> gauge_metrics(?MG(K, D, []));
 emqx_collect(K = emqx_mria_bootstrap_time, D) -> gauge_metrics(?MG(K, D, []));
 emqx_collect(K = emqx_mria_bootstrap_num_keys, D) -> gauge_metrics(?MG(K, D, []));
 emqx_collect(K = emqx_mria_message_queue_len, D) -> gauge_metrics(?MG(K, D, []));
-emqx_collect(K = emqx_mria_replayq_len, D) -> gauge_metrics(?MG(K, D, [])).
+emqx_collect(K = emqx_mria_replayq_len, D) -> gauge_metrics(?MG(K, D, []));
+%% DS
+emqx_collect(K = ?DS_EGRESS_BATCHES, D) -> counter_metrics(?MG(K, D, []));
+emqx_collect(K = ?DS_EGRESS_BATCHES_RETRY, D) -> counter_metrics(?MG(K, D, []));
+emqx_collect(K = ?DS_EGRESS_BATCHES_FAILED, D) -> counter_metrics(?MG(K, D, []));
+emqx_collect(K = ?DS_EGRESS_MESSAGES, D) -> counter_metrics(?MG(K, D, []));
+emqx_collect(K = ?DS_EGRESS_BYTES, D) -> counter_metrics(?MG(K, D, []));
+emqx_collect(K = ?DS_EGRESS_FLUSH_TIME, D) -> gauge_metrics(?MG(K, D, []));
+emqx_collect(K = ?DS_STORE_BATCH_TIME, D) -> gauge_metrics(?MG(K, D, []));
+emqx_collect(K = ?DS_BUILTIN_NEXT_TIME, D) -> gauge_metrics(?MG(K, D, []));
+emqx_collect(K = ?DS_LTS_SEEK_COUNTER, D) -> counter_metrics(?MG(K, D, []));
+emqx_collect(K = ?DS_LTS_NEXT_COUNTER, D) -> counter_metrics(?MG(K, D, []));
+emqx_collect(K = ?DS_LTS_COLLISION_COUNTER, D) -> counter_metrics(?MG(K, D, [])).
 
 %%--------------------------------------------------------------------
 %% Indicators

+ 1 - 0
apps/emqx_utils/README.md

@@ -16,6 +16,7 @@ handling, data conversions, and more.
 - `emqx_utils_json`: JSON encoding and decoding
 - `emqx_utils_maps`: convenience functions for map lookup and manipulation like
   deep_get etc.
+- `emqx_metrics`: counters, gauges, slides
 
 ## Contributing
 

apps/emqx/src/emqx_metrics_worker.erl → apps/emqx_utils/src/emqx_metrics_worker.erl


+ 2 - 3
apps/emqx/test/emqx_metrics_worker_SUITE.erl

@@ -31,18 +31,17 @@ suite() ->
 -define(NAME, ?MODULE).
 
 init_per_suite(Config) ->
-    {ok, _} = emqx_metrics_worker:start_link(?NAME),
     Config.
 
 end_per_suite(_Config) ->
-    ok = emqx_metrics_worker:stop(?NAME).
+    ok.
 
 init_per_testcase(_, Config) ->
-    ok = emqx_metrics_worker:stop(?NAME),
     {ok, _} = emqx_metrics_worker:start_link(?NAME),
     Config.
 
 end_per_testcase(_, _Config) ->
+    ok = emqx_metrics_worker:stop(?NAME),
     ok.
 
 t_get_metrics(_) ->

+ 29 - 0
changes/ce/feat-12781.en.md

@@ -0,0 +1,29 @@
+Added metrics related to EMQX durable storage to Prometheus.
+
+New metrics:
+
+- `emqx_ds_egress_batches`
+
+- `emqx_ds_egress_batches_retry`
+
+- `emqx_ds_egress_batches_failed`
+
+- `emqx_ds_egress_messages`
+
+- `emqx_ds_egress_bytes`
+
+- `emqx_ds_egress_flush_time`
+
+- `emqx_ds_store_batch_time`
+
+- `emqx_ds_builtin_next_time`
+
+- `emqx_ds_storage_bitfield_lts_counter_seek`
+
+- `emqx_ds_storage_bitfield_lts_counter_next`
+
+- `emqx_ds_storage_bitfield_lts_counter_collision`
+
+Note: these metrics are only visible when session persistence is enabled.
+
+Number of persisted messages has been also added to the dashboard.