|
|
@@ -114,6 +114,7 @@ store_batch(DB, Messages, Opts) ->
|
|
|
init([DB, Shard]) ->
|
|
|
process_flag(trap_exit, true),
|
|
|
process_flag(message_queue_data, off_heap),
|
|
|
+ logger:update_process_metadata(#{domain => [emqx, ds, egress, DB]}),
|
|
|
MetricsId = emqx_ds_builtin_metrics:shard_metric_id(DB, Shard),
|
|
|
ok = emqx_ds_builtin_metrics:init_for_shard(MetricsId),
|
|
|
S = #s{
|
|
|
@@ -234,19 +235,29 @@ do_flush(
|
|
|
pending_replies = []
|
|
|
};
|
|
|
{error, recoverable, Reason} ->
|
|
|
+ %% Note: this is a hot loop, so we report error messages
|
|
|
+ %% with `debug' level to avoid wiping the logs. Instead,
|
|
|
+ %% error the detection must rely on the metrics. Debug
|
|
|
+ %% logging can be enabled for the particular egress server
|
|
|
+ %% via logger domain.
|
|
|
+ ?tp(
|
|
|
+ debug,
|
|
|
+ emqx_ds_replication_layer_egress_flush_failed,
|
|
|
+ #{db => DB, shard => Shard, reason => Reason, recoverable => true}
|
|
|
+ ),
|
|
|
%% Retry sending the batch:
|
|
|
emqx_ds_builtin_metrics:inc_egress_batches_retry(Metrics),
|
|
|
erlang:garbage_collect(),
|
|
|
%% We block the gen_server until the next retry.
|
|
|
BlockTime = ?COOLDOWN_MIN + rand:uniform(?COOLDOWN_MAX - ?COOLDOWN_MIN),
|
|
|
timer:sleep(BlockTime),
|
|
|
+ S;
|
|
|
+ Err = {error, unrecoverable, Reason} ->
|
|
|
?tp(
|
|
|
- warning,
|
|
|
+ debug,
|
|
|
emqx_ds_replication_layer_egress_flush_failed,
|
|
|
- #{db => DB, shard => Shard, reason => Reason}
|
|
|
+ #{db => DB, shard => Shard, reason => Reason, recoverable => false}
|
|
|
),
|
|
|
- S;
|
|
|
- Err = {error, unrecoverable, _} ->
|
|
|
emqx_ds_builtin_metrics:inc_egress_batches_failed(Metrics),
|
|
|
lists:foreach(fun(From) -> gen_server:reply(From, Err) end, Replies),
|
|
|
erlang:garbage_collect(),
|