Просмотр исходного кода

fix(ds): Actually retry sending batch

ieQu1 2 лет назад
Родитель
Сommit
75b092bf0e

+ 9 - 1
apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl

@@ -31,7 +31,7 @@
     ensure_shard/1,
     ensure_egress/1
 ]).
--export([which_shards/1]).
+-export([which_dbs/0, which_shards/1]).
 
 %% behaviour callbacks:
 -export([init/1]).
@@ -104,6 +104,13 @@ ensure_egress(Shard) ->
 which_shards(DB) ->
     supervisor:which_children(?via(#?shards_sup{db = DB})).
 
+%% @doc Return the list of builtin DS databases that are currently
+%% active on the node.
+-spec which_dbs() -> [emqx_ds:db()].
+which_dbs() ->
+    Key = {n, l, #?db_sup{_ = '_', db = '$1'}},
+    gproc:select({local, names}, [{{Key, '_', '_'}, [], ['$1']}]).
+
 %%================================================================================
 %% behaviour callbacks
 %%================================================================================
@@ -111,6 +118,7 @@ which_shards(DB) ->
 init({#?db_sup{db = DB}, DefaultOpts}) ->
     %% Spec for the top-level supervisor for the database:
     logger:notice("Starting DS DB ~p", [DB]),
+    emqx_ds_builtin_metrics:init_for_db(DB),
     Opts = emqx_ds_replication_layer_meta:open_db(DB, DefaultOpts),
     ok = start_ra_system(DB, Opts),
     Children = [

+ 118 - 14
apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl

@@ -15,11 +15,16 @@
 %%--------------------------------------------------------------------
 -module(emqx_ds_builtin_metrics).
 
-%% API:
+%% DS-facing API:
 -export([child_spec/0, init_for_db/1, shard_metric_id/2, init_for_shard/1]).
+
+%% Prometheus-facing API:
+-export([prometheus_meta/0, prometheus_collect/1]).
+
 -export([
     inc_egress_batches/1,
     inc_egress_batches_retry/1,
+    inc_egress_batches_failed/1,
     inc_egress_messages/2,
     inc_egress_bytes/2,
     observe_egress_flush_time/2
@@ -42,11 +47,12 @@
 -define(DB_METRICS, []).
 
 -define(SHARD_METRICS, [
-    'egress.batches',
-    'egress.batches.retry',
-    'egress.messages',
-    'egress.bytes',
-    {slide, 'egress.flush_time'}
+    {counter, 'emqx_ds_egress_batches'},
+    {counter, 'emqx_ds_egress_batches_retry'},
+    {counter, 'emqx_ds_egress_batches_failed'},
+    {counter, 'emqx_ds_egress_messages'},
+    {counter, 'emqx_ds_egress_bytes'},
+    {slide, 'emqx_ds_egress_flush_time'}
 ]).
 
 -type shard_metrics_id() :: binary().
@@ -59,14 +65,16 @@
 child_spec() ->
     emqx_metrics_worker:child_spec(?WORKER).
 
+%% @doc Initialize metrics that are global for a DS database
 -spec init_for_db(emqx_ds:db()) -> ok.
-init_for_db(DB) ->
-    emqx_metrics_worker:create_metrics(?WORKER, DB, ?DB_METRICS, []).
+init_for_db(_DB) ->
+    ok.
 
 -spec shard_metric_id(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> shard_metrics_id().
 shard_metric_id(DB, ShardId) ->
     iolist_to_binary([atom_to_list(DB), $/, ShardId]).
 
+%% @doc Initialize metrics that are specific for the shard.
 -spec init_for_shard(shard_metrics_id()) -> ok.
 init_for_shard(ShardId) ->
     emqx_metrics_worker:create_metrics(?WORKER, ShardId, ?SHARD_METRICS, []).
@@ -74,28 +82,124 @@ init_for_shard(ShardId) ->
 %% @doc Increase the number of successfully flushed batches
 -spec inc_egress_batches(shard_metrics_id()) -> ok.
 inc_egress_batches(Id) ->
-    emqx_metrics_worker:inc(?WORKER, Id, 'egress.batches').
+    emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_batches').
 
 %% @doc Increase the number of time the egress worker had to retry
 %% flushing the batch
 -spec inc_egress_batches_retry(shard_metrics_id()) -> ok.
 inc_egress_batches_retry(Id) ->
-    emqx_metrics_worker:inc(?WORKER, Id, 'egress.batches.retry').
+    emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_batches_retry').
+
+%% @doc Increase the number of time the egress worker encountered an
+%% unrecoverable error while trying to flush the batch
+-spec inc_egress_batches_failed(shard_metrics_id()) -> ok.
+inc_egress_batches_failed(Id) ->
+    emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_batches_failed').
 
 %% @doc Increase the number of messages successfully saved to the shard
 -spec inc_egress_messages(shard_metrics_id(), non_neg_integer()) -> ok.
 inc_egress_messages(Id, NMessages) ->
-    emqx_metrics_worker:inc(?WORKER, Id, 'egress.messages', NMessages).
+    emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_messages', NMessages).
 
 %% @doc Increase the number of messages successfully saved to the shard
 -spec inc_egress_bytes(shard_metrics_id(), non_neg_integer()) -> ok.
 inc_egress_bytes(Id, NMessages) ->
-    emqx_metrics_worker:inc(?WORKER, Id, 'egress.bytes', NMessages).
+    emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_bytes', NMessages).
 
-%% @doc Add a sample of time spent flushing the egress to the Raft log (in microseconds)
+%% @doc Add a sample of elapsed time spent flushing the egress to the
+%% Raft log (in microseconds)
 -spec observe_egress_flush_time(shard_metrics_id(), non_neg_integer()) -> ok.
 observe_egress_flush_time(Id, FlushTime) ->
-    emqx_metrics_worker:observe(?WORKER, Id, 'egress.flush_time', FlushTime).
+    emqx_metrics_worker:observe(?WORKER, Id, 'emqx_ds_egress_flush_time', FlushTime).
+
+prometheus_meta() ->
+    lists:map(
+        fun
+            ({counter, A}) ->
+                {A, counter, A};
+            ({slide, A}) ->
+                {A, counter, A}
+        end,
+        ?SHARD_METRICS
+    ).
+
+prometheus_collect(NodeOrAggr) ->
+    prometheus_per_shard(NodeOrAggr).
+
+%% This function returns the data in the following format:
+%% ```
+%% #{emqx_ds_egress_batches =>
+%%       [{[{db,emqx_persistent_message},{shard,<<"1">>}],99408},
+%%        {[{db,emqx_persistent_message},{shard,<<"0">>}],99409}],
+%%   emqx_ds_egress_batches_retry =>
+%%       [{[{db,emqx_persistent_message},{shard,<<"1">>}],0},
+%%        {[{db,emqx_persistent_message},{shard,<<"0">>}],0}],
+%%   emqx_ds_egress_messages =>
+%%        ...
+%%  }
+%% '''
+%%
+%% If `NodeOrAggr' = `node' then node name is appended to the list of
+%% labels.
+prometheus_per_shard(NodeOrAggr) ->
+    lists:foldl(
+        fun(DB, Acc0) ->
+            lists:foldl(
+                fun(Shard, Acc) ->
+                    prometheus_per_shard(NodeOrAggr, DB, Shard, Acc)
+                end,
+                Acc0,
+                emqx_ds_replication_layer_meta:shards(DB)
+            )
+        end,
+        #{},
+        emqx_ds_builtin_db_sup:which_dbs()
+    ).
+
+prometheus_per_shard(NodeOrAggr, DB, Shard, Acc0) ->
+    Labels = [
+        {db, DB},
+        {shard, Shard}
+        | case NodeOrAggr of
+            node -> [];
+            _ -> [{node, node()}]
+        end
+    ],
+    #{counters := CC, slides := SS} = emqx_metrics_worker:get_metrics(
+        ?WORKER, shard_metric_id(DB, Shard)
+    ),
+    %% Collect counters:
+    Acc1 = maps:fold(
+        fun(MetricId, Value, Acc1) ->
+            append_to_key(MetricId, {Labels, Value}, Acc1)
+        end,
+        Acc0,
+        CC
+    ),
+    %% Collect slides:
+    maps:fold(
+        fun(MetricId, Value, Acc2) ->
+            Acc3 = append_to_key(MetricId, slide_value(current, Value, Labels), Acc2),
+            append_to_key(MetricId, slide_value(last5m, Value, Labels), Acc3)
+        end,
+        Acc1,
+        SS
+    ).
+
+-spec append_to_key(K, V, #{K => [V]}) -> #{K => [V]}.
+append_to_key(Key, Value, Map) ->
+    maps:update_with(
+        Key,
+        fun(L) ->
+            [Value | L]
+        end,
+        [Value],
+        Map
+    ).
+
+slide_value(Interval, Value, Labels0) ->
+    Labels = [{interval, Interval} | Labels0],
+    {Labels, maps:get(Interval, Value, 0)}.
 
 %%================================================================================
 %% Internal functions

+ 171 - 111
apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl

@@ -51,13 +51,10 @@
 -define(flush, flush).
 
 -record(enqueue_req, {
-    message :: emqx_types:message(),
-    sync :: boolean(),
-    payload_bytes :: non_neg_integer()
-}).
--record(enqueue_atomic_req, {
-    batch :: [emqx_types:message()],
+    messages :: [emqx_types:message()],
     sync :: boolean(),
+    atomic :: boolean(),
+    n_messages :: non_neg_integer(),
     payload_bytes :: non_neg_integer()
 }).
 
@@ -70,53 +67,33 @@ start_link(DB, Shard) ->
     gen_server:start_link(?via(DB, Shard), ?MODULE, [DB, Shard], []).
 
 -spec store_batch(emqx_ds:db(), [emqx_types:message()], emqx_ds:message_store_opts()) ->
-    ok.
+    emqx_ds:store_batch_result().
 store_batch(DB, Messages, Opts) ->
     Sync = maps:get(sync, Opts, true),
-    case maps:get(atomic, Opts, false) of
-        false ->
-            lists:foreach(
-                fun(Message) ->
-                    Shard = emqx_ds_replication_layer:shard_of_message(DB, Message, clientid),
-                    gen_server:call(
-                        ?via(DB, Shard),
-                        #enqueue_req{
-                            message = Message,
-                            sync = Sync,
-                            payload_bytes = payload_size(Message)
-                        },
-                        infinity
-                    )
-                end,
-                Messages
+    Atomic = maps:get(atomic, Opts, false),
+    %% Usually we expect all messages in the batch to go into the
+    %% single shard, so this function is optimized for the happy case.
+    case shards_of_batch(DB, Messages) of
+        [{Shard, {NMsgs, NBytes}}] ->
+            %% Happy case:
+            gen_server:call(
+                ?via(DB, Shard),
+                #enqueue_req{
+                    messages = Messages,
+                    sync = Sync,
+                    atomic = Atomic,
+                    n_messages = NMsgs,
+                    payload_bytes = NBytes
+                },
+                infinity
             );
-        true ->
-            maps:foreach(
-                fun(Shard, Batch) ->
-                    PayloadBytes = lists:foldl(
-                        fun(Msg, Acc) ->
-                            Acc + payload_size(Msg)
-                        end,
-                        0,
-                        Batch
-                    ),
-                    gen_server:call(
-                        ?via(DB, Shard),
-                        #enqueue_atomic_req{
-                            batch = Batch,
-                            sync = Sync,
-                            payload_bytes = PayloadBytes
-                        },
-                        infinity
-                    )
-                end,
-                maps:groups_from_list(
-                    fun(Message) ->
-                        emqx_ds_replication_layer:shard_of_message(DB, Message, clientid)
-                    end,
-                    Messages
-                )
-            )
+        [_, _ | _] when Atomic ->
+            %% It's impossible to commit a batch to multiple shards
+            %% atomically
+            {error, unrecoverable, atomic_commit_to_multiple_shards};
+        _Shards ->
+            %% Use a slower implementation for the unlikely case:
+            repackage_messages(DB, Messages, Sync, Atomic)
     end.
 
 %%================================================================================
@@ -129,7 +106,7 @@ store_batch(DB, Messages, Opts) ->
     metrics_id :: emqx_ds_builtin_metrics:shard_metrics_id(),
     n = 0 :: non_neg_integer(),
     n_bytes = 0 :: non_neg_integer(),
-    tref :: reference(),
+    tref :: undefined | reference(),
     queue :: queue:queue(emqx_types:message()),
     pending_replies = [] :: [gen_server:from()]
 }).
@@ -143,16 +120,18 @@ init([DB, Shard]) ->
         db = DB,
         shard = Shard,
         metrics_id = MetricsId,
-        tref = start_timer(),
         queue = queue:new()
     },
-    {ok, S}.
+    {ok, start_timer(S)}.
 
-handle_call(#enqueue_req{message = Msg, sync = Sync, payload_bytes = NBytes}, From, S) ->
-    do_enqueue(From, Sync, Msg, NBytes, S);
-handle_call(#enqueue_atomic_req{batch = Batch, sync = Sync, payload_bytes = NBytes}, From, S) ->
-    Len = length(Batch),
-    do_enqueue(From, Sync, {atomic, Len, Batch}, NBytes, S);
+handle_call(
+    #enqueue_req{
+        messages = Msgs, sync = Sync, atomic = Atomic, n_messages = NMsgs, payload_bytes = NBytes
+    },
+    From,
+    S
+) ->
+    {noreply, enqueue(From, Sync, Atomic, Msgs, NMsgs, NBytes, S)};
 handle_call(_Call, _From, S) ->
     {reply, {error, unknown_call}, S}.
 
@@ -160,7 +139,7 @@ handle_cast(_Cast, S) ->
     {noreply, S}.
 
 handle_info(?flush, S) ->
-    {noreply, do_flush(S)};
+    {noreply, flush(S)};
 handle_info(_Info, S) ->
     {noreply, S}.
 
@@ -175,9 +154,60 @@ terminate(_Reason, _S) ->
 %% Internal functions
 %%================================================================================
 
+enqueue(
+    From,
+    Sync,
+    Atomic,
+    Msgs,
+    BatchSize,
+    BatchBytes,
+    S0 = #s{n = NMsgs0, n_bytes = NBytes0, queue = Q0, pending_replies = Replies0}
+) ->
+    %% At this point we don't split the batches, even when they aren't
+    %% atomic. It wouldn't win us anything in terms of memory, and
+    %% EMQX currently feeds data to DS in very small batches, so
+    %% granularity should be fine enough.
+    NMax = application:get_env(emqx_durable_storage, egress_batch_size, 1000),
+    NBytesMax = application:get_env(emqx_durable_storage, egress_batch_bytes, infinity),
+    NMsgs = NMsgs0 + BatchSize,
+    NBytes = NBytes0 + BatchBytes,
+    case (NMsgs >= NMax orelse NBytes >= NBytesMax) andalso (NMsgs0 > 0) of
+        true ->
+            %% Adding this batch would cause buffer to overflow. Flush
+            %% it now, and retry:
+            cancel_timer(S0),
+            S1 = flush(S0),
+            enqueue(From, Sync, Atomic, Msgs, BatchSize, BatchBytes, S1);
+        false ->
+            %% The buffer is empty, we enqueue the atomic batch in its
+            %% entirety:
+            Q1 = lists:foldl(fun queue:in/2, Q0, Msgs),
+            Replies =
+                case Sync of
+                    true ->
+                        [From | Replies0];
+                    false ->
+                        gen_server:reply(From, ok),
+                        Replies0
+                end,
+            S1 = S0#s{n = NMsgs, n_bytes = NBytes, queue = Q1, pending_replies = Replies},
+            case NMsgs >= NMax orelse NBytes >= NBytes of
+                true ->
+                    cancel_timer(S1),
+                    flush(S1);
+                false ->
+                    S1
+            end
+    end.
+
 -define(COOLDOWN_MIN, 1000).
 -define(COOLDOWN_MAX, 5000).
 
+flush(S) ->
+    start_timer(do_flush(S)).
+
+do_flush(S0 = #s{n = 0}) ->
+    S0;
 do_flush(
     S = #s{queue = Q, pending_replies = Replies, db = DB, shard = Shard, metrics_id = Metrics}
 ) ->
@@ -202,73 +232,103 @@ do_flush(
                 n = 0,
                 n_bytes = 0,
                 queue = queue:new(),
-                pending_replies = [],
-                tref = start_timer()
+                pending_replies = []
             };
-        Error ->
-            emqx_ds_builtin_metrics:inc_egress_batches_retry(S#s.metrics_id),
+        {error, recoverable, Reason} ->
+            %% Retry sending the batch:
+            emqx_ds_builtin_metrics:inc_egress_batches_retry(Metrics),
             erlang:garbage_collect(),
+            %% We block the gen_server until the next retry.
+            BlockTime = ?COOLDOWN_MIN + rand:uniform(?COOLDOWN_MAX - ?COOLDOWN_MIN),
+            timer:sleep(BlockTime),
             ?tp(
                 warning,
                 emqx_ds_replication_layer_egress_flush_failed,
-                #{db => DB, shard => Shard, reason => Error}
+                #{db => DB, shard => Shard, reason => Reason}
             ),
-            Cooldown = ?COOLDOWN_MIN + rand:uniform(?COOLDOWN_MAX - ?COOLDOWN_MIN),
+            S;
+        Err = {error, unrecoverable, _} ->
+            emqx_ds_builtin_metrics:inc_egress_batches_failed(Metrics),
+            lists:foreach(fun(From) -> gen_server:reply(From, Err) end, Replies),
+            erlang:garbage_collect(),
             S#s{
-                tref = start_timer(Cooldown)
+                n = 0,
+                n_bytes = 0,
+                queue = queue:new(),
+                pending_replies = []
             }
     end.
 
-do_enqueue(
-    From,
-    Sync,
-    MsgOrBatch,
-    BatchBytes,
-    S0 = #s{n = N, n_bytes = NBytes0, queue = Q0, pending_replies = Replies}
-) ->
-    NBytes = NBytes0 + BatchBytes,
-    NMax = application:get_env(emqx_durable_storage, egress_batch_size, 1000),
-    S1 =
-        case MsgOrBatch of
-            {atomic, NumMsgs, Msgs} ->
-                Q = lists:foldl(fun queue:in/2, Q0, Msgs),
-                S0#s{n = N + NumMsgs, n_bytes = NBytes, queue = Q};
-            Msg ->
-                S0#s{n = N + 1, n_bytes = NBytes, queue = queue:in(Msg, Q0)}
-        end,
-    %% TODO: later we may want to delay the reply until the message is
-    %% replicated, but it requies changes to the PUBACK/PUBREC flow to
-    %% allow for async replies. For now, we ack when the message is
-    %% _buffered_ rather than stored.
-    %%
-    %% Otherwise, the client would freeze for at least flush interval,
-    %% or until the buffer is filled.
-    S2 =
-        case Sync of
-            true ->
-                S1#s{pending_replies = [From | Replies]};
-            false ->
-                gen_server:reply(From, ok),
-                S1
+-spec shards_of_batch(emqx_ds:db(), [emqx_types:message()]) ->
+    [{emqx_ds_replication_layer:shard_id(), {NMessages, NBytes}}]
+when
+    NMessages :: non_neg_integer(),
+    NBytes :: non_neg_integer().
+shards_of_batch(DB, Messages) ->
+    maps:to_list(
+        lists:foldl(
+            fun(Message, Acc) ->
+                %% TODO: sharding strategy must be part of the DS DB schema:
+                Shard = emqx_ds_replication_layer:shard_of_message(DB, Message, clientid),
+                Size = payload_size(Message),
+                maps:update_with(
+                    Shard,
+                    fun({N, S}) ->
+                        {N + 1, S + Size}
+                    end,
+                    {1, Size},
+                    Acc
+                )
+            end,
+            #{},
+            Messages
+        )
+    ).
+
+repackage_messages(DB, Messages, Sync, Atomic) ->
+    Batches = lists:foldl(
+        fun(Message, Acc) ->
+            Shard = emqx_ds_replication_layer:shard_of_message(DB, Message, clientid),
+            Size = payload_size(Message),
+            maps:update_with(
+                Shard,
+                fun({N, S, Msgs}) ->
+                    {N + 1, S + Size, [Message | Msgs]}
+                end,
+                {1, Size, [Message]},
+                Acc
+            )
         end,
-    S =
-        case N >= NMax of
-            true ->
-                _ = erlang:cancel_timer(S2#s.tref),
-                do_flush(S2);
-            false ->
-                S2
+        #{},
+        Messages
+    ),
+    maps:foreach(
+        fun(Shard, {NMsgs, ByteSize, RevMessages}) ->
+            gen_server:call(
+                ?via(DB, Shard),
+                #enqueue_req{
+                    messages = lists:reverse(RevMessages),
+                    sync = Sync,
+                    atomic = Atomic,
+                    n_messages = NMsgs,
+                    payload_bytes = ByteSize
+                },
+                infinity
+            )
         end,
-    %% TODO: add a backpressure mechanism for the server to avoid
-    %% building a long message queue.
-    {noreply, S}.
+        Batches
+    ).
 
-start_timer() ->
+start_timer(S) ->
     Interval = application:get_env(emqx_durable_storage, egress_flush_interval, 100),
-    start_timer(Interval).
+    Tref = erlang:send_after(Interval, self(), ?flush),
+    S#s{tref = Tref}.
 
-start_timer(Interval) ->
-    erlang:send_after(Interval, self(), ?flush).
+cancel_timer(#s{tref = undefined}) ->
+    ok;
+cancel_timer(#s{tref = TRef}) ->
+    _ = erlang:cancel_timer(TRef),
+    ok.
 
 %% @doc Return approximate size of the MQTT message (it doesn't take
 %% all things into account, for example headers and extras)

+ 3 - 0
apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl

@@ -254,6 +254,9 @@ drop_shard(Shard) ->
 store_batch(Shard, Messages = [{Time, _Msg} | _], Options) ->
     %% NOTE
     %% We assume that batches do not span generations. Callers should enforce this.
+    ?tp(emqx_ds_storage_layer_store_batch, #{
+        shard => Shard, messages => Messages, options => Options
+    }),
     #{module := Mod, data := GenData} = generation_at(Shard, Time),
     Mod:store_batch(Shard, GenData, Messages, Options);
 store_batch(_Shard, [], _Options) ->

+ 87 - 16
apps/emqx_durable_storage/test/emqx_ds_SUITE.erl

@@ -322,17 +322,10 @@ t_09_atomic_store_batch(_Config) ->
                     sync => true
                 })
             ),
-
-            ok
+            {ok, Flush} = ?block_until(#{?snk_kind := emqx_ds_replication_layer_egress_flush}),
+            ?assertMatch(#{batch := [_, _, _]}, Flush)
         end,
-        fun(Trace) ->
-            %% Must contain exactly one flush with all messages.
-            ?assertMatch(
-                [#{batch := [_, _, _]}],
-                ?of_kind(emqx_ds_replication_layer_egress_flush, Trace)
-            ),
-            ok
-        end
+        []
     ),
     ok.
 
@@ -355,14 +348,15 @@ t_10_non_atomic_store_batch(_Config) ->
                     sync => true
                 })
             ),
-
-            ok
+            timer:sleep(1000)
         end,
         fun(Trace) ->
             %% Should contain one flush per message.
+            Batches = ?projection(batch, ?of_kind(emqx_ds_replication_layer_egress_flush, Trace)),
+            ?assertMatch([_], Batches),
             ?assertMatch(
-                [#{batch := [_]}, #{batch := [_]}, #{batch := [_]}],
-                ?of_kind(emqx_ds_replication_layer_egress_flush, Trace)
+                [_, _, _],
+                lists:append(Batches)
             ),
             ok
         end
@@ -681,10 +675,86 @@ t_error_mapping_replication_layer(_Config) ->
         length([error || {error, _, _} <- Results2]) > 0,
         Results2
     ),
-
-    snabbkaffe:stop(),
     meck:unload().
 
+%% This test suite verifies the behavior of `store_batch' operation
+%% when the underlying code experiences recoverable or unrecoverable
+%% problems.
+t_store_batch_fail(_Config) ->
+    ?check_trace(
+        #{timetrap => 15_000},
+        try
+            meck:new(emqx_ds_replication_layer, [passthrough, no_history]),
+            DB = ?FUNCTION_NAME,
+            ?assertMatch(ok, emqx_ds:open_db(DB, (opts())#{n_shards => 2})),
+            %% Success:
+            Batch1 = [
+                message(<<"C1">>, <<"foo/bar">>, <<"1">>, 1),
+                message(<<"C1">>, <<"foo/bar">>, <<"2">>, 1)
+            ],
+            ?assertMatch(ok, emqx_ds:store_batch(DB, Batch1, #{sync => true})),
+            %% Inject unrecoverable error:
+            meck:expect(emqx_ds_replication_layer, ra_store_batch, fun(_DB, _Shard, _Messages) ->
+                {error, unrecoverable, mock}
+            end),
+            Batch2 = [
+                message(<<"C1">>, <<"foo/bar">>, <<"3">>, 1),
+                message(<<"C1">>, <<"foo/bar">>, <<"4">>, 1)
+            ],
+            ?assertMatch(
+                {error, unrecoverable, mock}, emqx_ds:store_batch(DB, Batch2, #{sync => true})
+            ),
+            %% Inject a recoverable error:
+            Batch3 = [
+                message(<<"C1">>, <<"foo/bar">>, <<"5">>, 2),
+                message(<<"C2">>, <<"foo/bar">>, <<"6">>, 2),
+                message(<<"C1">>, <<"foo/bar">>, <<"7">>, 3),
+                message(<<"C2">>, <<"foo/bar">>, <<"8">>, 3)
+            ],
+            meck:expect(emqx_ds_replication_layer, ra_store_batch, fun(DB, Shard, Messages) ->
+                try
+                    ?tp(store_batch, #{messages => Messages}),
+                    meck:passthrough([DB, Shard, Messages])
+                catch
+                    _:_ ->
+                        {error, recoverable, mock}
+                end
+            end),
+            ?inject_crash(#{?snk_kind := store_batch}, snabbkaffe_nemesis:recover_after(3)),
+            ?assertMatch(ok, emqx_ds:store_batch(DB, Batch3, #{sync => true})),
+            lists:sort(emqx_ds_test_helpers:consume_per_stream(DB, ['#'], 1))
+        after
+            meck:unload()
+        end,
+        [
+            {"number of successfull flushes after retry", fun(Trace) ->
+                ?assertMatch([_, _], ?of_kind(store_batch, Trace))
+            end},
+            {"number of retries", fun(Trace) ->
+                ?assertMatch([_, _, _], ?of_kind(snabbkaffe_crash, Trace))
+            end},
+            {"message ordering", fun(StoredMessages, _Trace) ->
+                [{_, Stream1}, {_, Stream2}] = StoredMessages,
+                ?assertMatch(
+                    [
+                        #message{payload = <<"1">>},
+                        #message{payload = <<"2">>},
+                        #message{payload = <<"5">>},
+                        #message{payload = <<"7">>}
+                    ],
+                    Stream1
+                ),
+                ?assertMatch(
+                    [
+                        #message{payload = <<"6">>},
+                        #message{payload = <<"8">>}
+                    ],
+                    Stream2
+                )
+            end}
+        ]
+    ).
+
 update_data_set() ->
     [
         [
@@ -748,6 +818,7 @@ init_per_testcase(_TC, Config) ->
     Config.
 
 end_per_testcase(_TC, _Config) ->
+    snabbkaffe:stop(),
     ok = application:stop(emqx_durable_storage),
     mria:stop(),
     _ = mnesia:delete_schema([node()]),

+ 9 - 2
apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl

@@ -63,9 +63,16 @@ consume(DB, TopicFilter) ->
     consume(DB, TopicFilter, 0).
 
 consume(DB, TopicFilter, StartTime) ->
-    Streams = emqx_ds:get_streams(DB, TopicFilter, StartTime),
     lists:flatmap(
-        fun({_Rank, Stream}) -> consume_stream(DB, Stream, TopicFilter, StartTime) end,
+      fun({_Stream, Msgs}) ->
+              Msgs
+      end,
+      consume_per_stream(DB, TopicFilter, StartTime)).
+
+consume_per_stream(DB, TopicFilter, StartTime) ->
+    Streams = emqx_ds:get_streams(DB, TopicFilter, StartTime),
+    lists:map(
+        fun({_Rank, Stream}) -> {Stream, consume_stream(DB, Stream, TopicFilter, StartTime)} end,
         Streams
     ).
 

+ 12 - 14
apps/emqx_prometheus/src/emqx_prometheus.erl

@@ -212,7 +212,9 @@ collect_mf(?PROMETHEUS_DEFAULT_REGISTRY, Callback) ->
 
     ok = add_collect_family(Callback, cert_metric_meta(), ?MG(cert_data, RawData)),
     ok = add_collect_family(Callback, mria_metric_meta(), ?MG(mria_data, RawData)),
-    ok = add_collect_family(Callback, ds_metric_meta(), ?MG(ds_data, RawData)),
+    ok = add_collect_family(
+        Callback, emqx_ds_builtin_metrics:prometheus_meta(), ?MG(ds_data, RawData)
+    ),
     ok = maybe_license_add_collect_family(Callback, RawData),
     ok;
 collect_mf(_Registry, _Callback) ->
@@ -265,6 +267,7 @@ fetch_from_local_node(Mode) ->
         emqx_olp_data => emqx_metric_data(olp_metric_meta(), Mode),
         emqx_acl_data => emqx_metric_data(acl_metric_meta(), Mode),
         emqx_authn_data => emqx_metric_data(authn_metric_meta(), Mode),
+        ds_data => emqx_ds_builtin_metrics:prometheus_collect(Mode),
         mria_data => mria_data(Mode)
     }}.
 
@@ -481,7 +484,14 @@ emqx_collect(K = emqx_mria_lag, D) -> gauge_metrics(?MG(K, D, []));
 emqx_collect(K = emqx_mria_bootstrap_time, D) -> gauge_metrics(?MG(K, D, []));
 emqx_collect(K = emqx_mria_bootstrap_num_keys, D) -> gauge_metrics(?MG(K, D, []));
 emqx_collect(K = emqx_mria_message_queue_len, D) -> gauge_metrics(?MG(K, D, []));
-emqx_collect(K = emqx_mria_replayq_len, D) -> gauge_metrics(?MG(K, D, [])).
+emqx_collect(K = emqx_mria_replayq_len, D) -> gauge_metrics(?MG(K, D, []));
+%% DS
+emqx_collect(K = emqx_ds_egress_batches, D) -> counter_metrics(?MG(K, D, []));
+emqx_collect(K = emqx_ds_egress_batches_retry, D) -> counter_metrics(?MG(K, D, []));
+emqx_collect(K = emqx_ds_egress_batches_failed, D) -> counter_metrics(?MG(K, D, []));
+emqx_collect(K = emqx_ds_egress_messages, D) -> counter_metrics(?MG(K, D, []));
+emqx_collect(K = emqx_ds_egress_bytes, D) -> counter_metrics(?MG(K, D, []));
+emqx_collect(K = emqx_ds_egress_flush_time, D) -> gauge_metrics(?MG(K, D, [])).
 
 %%--------------------------------------------------------------------
 %% Indicators
@@ -1012,18 +1022,6 @@ catch_all(DataFun) ->
         _:_ -> undefined
     end.
 
-%%========================================
-%% Durable storge
-%%========================================
-
-ds_metric_meta() ->
-    [
-     {emqx_ds_egress_batches, counter, 'egress.batches'},
-     {emqx_ds_egress_batches_retry, counter, 'egress.batches.retry'},
-     {emqx_ds_egress_messages, counter, 'egress.messages'},
-     {emqx_ds_egress_bytes, counter, 'egress.bytes'}
-    ].
-
 %%--------------------------------------------------------------------
 %% Collect functions
 %%--------------------------------------------------------------------