Просмотр исходного кода

fix(dsrepl): handle errors gracefully in shard egress process

Also add cooldown on timeout / unavailability.
Andrew Mayorov 2 лет назад
Родитель
Сommit
887e151be5

+ 9 - 5
apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl

@@ -492,23 +492,27 @@ list_nodes() ->
 
 %%
 
+%% TODO
+%% Too large for normal operation, need better backpressure mechanism.
+-define(RA_TIMEOUT, 60 * 1000).
+
 ra_store_batch(DB, Shard, Messages) ->
     Command = #{
         ?tag => ?BATCH,
         ?batch_messages => Messages
     },
     Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred),
-    case ra:process_command(Servers, Command) of
+    case ra:process_command(Servers, Command, ?RA_TIMEOUT) of
         {ok, Result, _Leader} ->
             Result;
         Error ->
-            error(Error, [DB, Shard])
+            Error
     end.
 
 ra_add_generation(DB, Shard) ->
     Command = #{?tag => add_generation},
     Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred),
-    case ra:process_command(Servers, Command) of
+    case ra:process_command(Servers, Command, ?RA_TIMEOUT) of
         {ok, Result, _Leader} ->
             Result;
         Error ->
@@ -518,7 +522,7 @@ ra_add_generation(DB, Shard) ->
 ra_update_config(DB, Shard, Opts) ->
     Command = #{?tag => update_config, ?config => Opts},
     Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred),
-    case ra:process_command(Servers, Command) of
+    case ra:process_command(Servers, Command, ?RA_TIMEOUT) of
         {ok, Result, _Leader} ->
             Result;
         Error ->
@@ -528,7 +532,7 @@ ra_update_config(DB, Shard, Opts) ->
 ra_drop_generation(DB, Shard, GenId) ->
     Command = #{?tag => drop_generation, ?generation => GenId},
     Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred),
-    case ra:process_command(Servers, Command) of
+    case ra:process_command(Servers, Command, ?RA_TIMEOUT) of
         {ok, Result, _Leader} ->
             Result;
         Error ->

+ 30 - 15
apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl

@@ -83,15 +83,13 @@ store_batch(DB, Messages, Opts) ->
             );
         true ->
             maps:foreach(
-                fun(Shard, Batch) ->
+                fun(Shard, BatchIn) ->
                     Timestamp = emqx_ds:timestamp_us(),
+                    Batch = [emqx_message:set_timestamp(Timestamp, Message) || Message <- BatchIn],
                     gen_server:call(
                         ?via(DB, Shard),
                         #enqueue_atomic_req{
-                            batch = [
-                                emqx_message:set_timestamp(Timestamp, Message)
-                             || Message <- Batch
-                            ],
+                            batch = Batch,
                             sync = Sync
                         },
                         infinity
@@ -156,22 +154,39 @@ terminate(_Reason, _S) ->
 %% Internal functions
 %%================================================================================
 
+-define(COOLDOWN_MIN, 1000).
+-define(COOLDOWN_MAX, 5000).
+
 do_flush(S = #s{batch = []}) ->
     S#s{tref = start_timer()};
 do_flush(
     S = #s{batch = Messages, pending_replies = Replies, db = DB, shard = Shard}
 ) ->
     %% FIXME
-    ok = emqx_ds_replication_layer:ra_store_batch(DB, Shard, lists:reverse(Messages)),
-    [gen_server:reply(From, ok) || From <- lists:reverse(Replies)],
-    ?tp(emqx_ds_replication_layer_egress_flush, #{db => DB, shard => Shard, batch => Messages}),
-    erlang:garbage_collect(),
-    S#s{
-        n = 0,
-        batch = [],
-        pending_replies = [],
-        tref = start_timer()
-    }.
+    case emqx_ds_replication_layer:ra_store_batch(DB, Shard, lists:reverse(Messages)) of
+        ok ->
+            lists:foreach(fun(From) -> gen_server:reply(From, ok) end, Replies),
+            ?tp(
+                emqx_ds_replication_layer_egress_flush,
+                #{db => DB, shard => Shard, batch => Messages}
+            ),
+            true = erlang:garbage_collect(),
+            S#s{
+                n = 0,
+                batch = [],
+                pending_replies = [],
+                tref = start_timer()
+            };
+        {error, Reason} ->
+            ?tp(
+                warning,
+                emqx_ds_replication_layer_egress_flush_failed,
+                #{db => DB, shard => Shard, reason => Reason}
+            ),
+            Cooldown = ?COOLDOWN_MIN + rand:uniform(?COOLDOWN_MAX - ?COOLDOWN_MIN),
+            ok = timer:sleep(Cooldown),
+            S#s{tref = start_timer()}
+    end.
 
 do_enqueue(From, Sync, MsgOrBatch, S0 = #s{n = N, batch = Batch, pending_replies = Replies}) ->
     NMax = application:get_env(emqx_durable_storage, egress_batch_size, 1000),