Pārlūkot izejas kodu

fix(ds): Limit the number of retries in egress to 0

ieQu1 1 gadu atpakaļ
vecāks
revīzija
f37ed3a40a

+ 2 - 0
apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl

@@ -551,6 +551,8 @@ list_nodes() ->
     end
     end
 ).
 ).
 
 
+-spec ra_store_batch(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), [emqx_types:message()]) ->
+    ok | {timeout, _} | {error, recoverable | unrecoverable, _Err} | _Err.
 ra_store_batch(DB, Shard, Messages) ->
 ra_store_batch(DB, Shard, Messages) ->
     Command = #{
     Command = #{
         ?tag => ?BATCH,
         ?tag => ?BATCH,

+ 31 - 9
apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl

@@ -103,6 +103,11 @@ store_batch(DB, Messages, Opts) ->
     db :: emqx_ds:db(),
     db :: emqx_ds:db(),
     shard :: emqx_ds_replication_layer:shard_id(),
     shard :: emqx_ds_replication_layer:shard_id(),
     metrics_id :: emqx_ds_builtin_metrics:shard_metrics_id(),
     metrics_id :: emqx_ds_builtin_metrics:shard_metrics_id(),
+    n_retries = 0 :: non_neg_integer(),
+    %% FIXME: Currently max_retries is always 0, because replication
+    %% layer doesn't guarantee idempotency. Retrying would create
+    %% duplicate messages.
+    max_retries = 0 :: non_neg_integer(),
     n = 0 :: non_neg_integer(),
     n = 0 :: non_neg_integer(),
     n_bytes = 0 :: non_neg_integer(),
     n_bytes = 0 :: non_neg_integer(),
     tref :: undefined | reference(),
     tref :: undefined | reference(),
@@ -216,7 +221,15 @@ flush(S) ->
 do_flush(S0 = #s{n = 0}) ->
 do_flush(S0 = #s{n = 0}) ->
     S0;
     S0;
 do_flush(
 do_flush(
-    S = #s{queue = Q, pending_replies = Replies, db = DB, shard = Shard, metrics_id = Metrics}
+    S = #s{
+        queue = Q,
+        pending_replies = Replies,
+        db = DB,
+        shard = Shard,
+        metrics_id = Metrics,
+        n_retries = Retries,
+        max_retries = MaxRetries
+    }
 ) ->
 ) ->
     Messages = queue:to_list(Q),
     Messages = queue:to_list(Q),
     T0 = erlang:monotonic_time(microsecond),
     T0 = erlang:monotonic_time(microsecond),
@@ -240,7 +253,7 @@ do_flush(
                 queue = queue:new(),
                 queue = queue:new(),
                 pending_replies = []
                 pending_replies = []
             };
             };
-        {error, recoverable, Reason} ->
+        {timeout, ServerId} when Retries < MaxRetries ->
             %% Note: this is a hot loop, so we report error messages
             %% Note: this is a hot loop, so we report error messages
             %% with `debug' level to avoid wiping the logs. Instead,
             %% with `debug' level to avoid wiping the logs. Instead,
             %% error the detection must rely on the metrics. Debug
             %% error the detection must rely on the metrics. Debug
@@ -248,8 +261,8 @@ do_flush(
             %% via logger domain.
             %% via logger domain.
             ?tp(
             ?tp(
                 debug,
                 debug,
-                emqx_ds_replication_layer_egress_flush_failed,
-                #{db => DB, shard => Shard, reason => Reason, recoverable => true}
+                emqx_ds_replication_layer_egress_flush_retry,
+                #{db => DB, shard => Shard, reason => timeout, server_id => ServerId}
             ),
             ),
             %% Retry sending the batch:
             %% Retry sending the batch:
             emqx_ds_builtin_metrics:inc_egress_batches_retry(Metrics),
             emqx_ds_builtin_metrics:inc_egress_batches_retry(Metrics),
@@ -257,21 +270,30 @@ do_flush(
             %% We block the gen_server until the next retry.
             %% We block the gen_server until the next retry.
             BlockTime = ?COOLDOWN_MIN + rand:uniform(?COOLDOWN_MAX - ?COOLDOWN_MIN),
             BlockTime = ?COOLDOWN_MIN + rand:uniform(?COOLDOWN_MAX - ?COOLDOWN_MIN),
             timer:sleep(BlockTime),
             timer:sleep(BlockTime),
-            S;
-        Err = {error, unrecoverable, Reason} ->
+            S#s{n_retries = Retries + 1};
+        Err ->
             ?tp(
             ?tp(
                 debug,
                 debug,
                 emqx_ds_replication_layer_egress_flush_failed,
                 emqx_ds_replication_layer_egress_flush_failed,
-                #{db => DB, shard => Shard, reason => Reason, recoverable => false}
+                #{db => DB, shard => Shard, error => Err}
             ),
             ),
             emqx_ds_builtin_metrics:inc_egress_batches_failed(Metrics),
             emqx_ds_builtin_metrics:inc_egress_batches_failed(Metrics),
-            lists:foreach(fun(From) -> gen_server:reply(From, Err) end, Replies),
+            Reply =
+                case Err of
+                    {error, _, _} -> Err;
+                    {timeout, ServerId} -> {error, recoverable, {timeout, ServerId}};
+                    _ -> {error, unrecoverable, Err}
+                end,
+            lists:foreach(
+                fun(From) -> gen_server:reply(From, Reply) end, Replies
+            ),
             erlang:garbage_collect(),
             erlang:garbage_collect(),
             S#s{
             S#s{
                 n = 0,
                 n = 0,
                 n_bytes = 0,
                 n_bytes = 0,
                 queue = queue:new(),
                 queue = queue:new(),
-                pending_replies = []
+                pending_replies = [],
+                n_retries = 0
             }
             }
     end.
     end.
 
 

+ 16 - 19
apps/emqx_durable_storage/test/emqx_ds_SUITE.erl

@@ -684,7 +684,7 @@ t_store_batch_fail(_Config) ->
     ?check_trace(
     ?check_trace(
         #{timetrap => 15_000},
         #{timetrap => 15_000},
         try
         try
-            meck:new(emqx_ds_replication_layer, [passthrough, no_history]),
+            meck:new(emqx_ds_storage_layer, [passthrough, no_history]),
             DB = ?FUNCTION_NAME,
             DB = ?FUNCTION_NAME,
             ?assertMatch(ok, emqx_ds:open_db(DB, (opts())#{n_shards => 2})),
             ?assertMatch(ok, emqx_ds:open_db(DB, (opts())#{n_shards => 2})),
             %% Success:
             %% Success:
@@ -694,7 +694,7 @@ t_store_batch_fail(_Config) ->
             ],
             ],
             ?assertMatch(ok, emqx_ds:store_batch(DB, Batch1, #{sync => true})),
             ?assertMatch(ok, emqx_ds:store_batch(DB, Batch1, #{sync => true})),
             %% Inject unrecoverable error:
             %% Inject unrecoverable error:
-            meck:expect(emqx_ds_replication_layer, ra_store_batch, fun(_DB, _Shard, _Messages) ->
+            meck:expect(emqx_ds_storage_layer, store_batch, fun(_DB, _Shard, _Messages) ->
                 {error, unrecoverable, mock}
                 {error, unrecoverable, mock}
             end),
             end),
             Batch2 = [
             Batch2 = [
@@ -704,35 +704,32 @@ t_store_batch_fail(_Config) ->
             ?assertMatch(
             ?assertMatch(
                 {error, unrecoverable, mock}, emqx_ds:store_batch(DB, Batch2, #{sync => true})
                 {error, unrecoverable, mock}, emqx_ds:store_batch(DB, Batch2, #{sync => true})
             ),
             ),
-            %% Inject a recoverable error:
+            meck:unload(emqx_ds_storage_layer),
+            %% Inject a recoveralbe error:
+            meck:new(ra, [passthrough, no_history]),
+            meck:expect(ra, process_command, fun(Servers, Shard, Command) ->
+                ?tp(ra_command, #{servers => Servers, shard => Shard, command => Command}),
+                {timeout, mock}
+            end),
             Batch3 = [
             Batch3 = [
                 message(<<"C1">>, <<"foo/bar">>, <<"5">>, 2),
                 message(<<"C1">>, <<"foo/bar">>, <<"5">>, 2),
                 message(<<"C2">>, <<"foo/bar">>, <<"6">>, 2),
                 message(<<"C2">>, <<"foo/bar">>, <<"6">>, 2),
                 message(<<"C1">>, <<"foo/bar">>, <<"7">>, 3),
                 message(<<"C1">>, <<"foo/bar">>, <<"7">>, 3),
                 message(<<"C2">>, <<"foo/bar">>, <<"8">>, 3)
                 message(<<"C2">>, <<"foo/bar">>, <<"8">>, 3)
             ],
             ],
-            meck:expect(emqx_ds_replication_layer, ra_store_batch, fun(DB, Shard, Messages) ->
-                try
-                    ?tp(store_batch, #{messages => Messages}),
-                    meck:passthrough([DB, Shard, Messages])
-                catch
-                    _:_ ->
-                        {error, recoverable, mock}
-                end
-            end),
-            ?inject_crash(#{?snk_kind := store_batch}, snabbkaffe_nemesis:recover_after(3)),
+            %% Note: due to idempotency issues the number of retries
+            %% is currently set to 0:
+            ?assertMatch(
+                {error, recoverable, {timeout, mock}},
+                emqx_ds:store_batch(DB, Batch3, #{sync => true})
+            ),
+            meck:unload(ra),
             ?assertMatch(ok, emqx_ds:store_batch(DB, Batch3, #{sync => true})),
             ?assertMatch(ok, emqx_ds:store_batch(DB, Batch3, #{sync => true})),
             lists:sort(emqx_ds_test_helpers:consume_per_stream(DB, ['#'], 1))
             lists:sort(emqx_ds_test_helpers:consume_per_stream(DB, ['#'], 1))
         after
         after
             meck:unload()
             meck:unload()
         end,
         end,
         [
         [
-            {"number of successfull flushes after retry", fun(Trace) ->
-                ?assertMatch([_, _], ?of_kind(store_batch, Trace))
-            end},
-            {"number of retries", fun(Trace) ->
-                ?assertMatch([_, _, _], ?of_kind(snabbkaffe_crash, Trace))
-            end},
             {"message ordering", fun(StoredMessages, _Trace) ->
             {"message ordering", fun(StoredMessages, _Trace) ->
                 [{_, Stream1}, {_, Stream2}] = StoredMessages,
                 [{_, Stream1}, {_, Stream2}] = StoredMessages,
                 ?assertMatch(
                 ?assertMatch(