|
|
@@ -179,14 +179,18 @@ terminate(_Reason, _S) ->
|
|
|
-define(COOLDOWN_MAX, 5000).
|
|
|
|
|
|
do_flush(
|
|
|
- S = #s{queue = Q, pending_replies = Replies, db = DB, shard = Shard}
|
|
|
+ S = #s{queue = Q, pending_replies = Replies, db = DB, shard = Shard, metrics_id = Metrics}
|
|
|
) ->
|
|
|
Messages = queue:to_list(Q),
|
|
|
- case emqx_ds_replication_layer:ra_store_batch(DB, Shard, Messages) of
|
|
|
+ T0 = erlang:monotonic_time(microsecond),
|
|
|
+ Result = emqx_ds_replication_layer:ra_store_batch(DB, Shard, Messages),
|
|
|
+ T1 = erlang:monotonic_time(microsecond),
|
|
|
+ emqx_ds_builtin_metrics:observe_egress_flush_time(Metrics, T1 - T0),
|
|
|
+ case Result of
|
|
|
ok ->
|
|
|
- emqx_ds_builtin_metrics:inc_egress_batches(S#s.metrics_id),
|
|
|
- emqx_ds_builtin_metrics:inc_egress_messages(S#s.metrics_id, S#s.n),
|
|
|
- emqx_ds_builtin_metrics:inc_egress_bytes(S#s.metrics_id, S#s.n_bytes),
|
|
|
+ emqx_ds_builtin_metrics:inc_egress_batches(Metrics),
|
|
|
+ emqx_ds_builtin_metrics:inc_egress_messages(Metrics, S#s.n),
|
|
|
+ emqx_ds_builtin_metrics:inc_egress_bytes(Metrics, S#s.n_bytes),
|
|
|
lists:foreach(fun(From) -> gen_server:reply(From, ok) end, Replies),
|
|
|
?tp(
|
|
|
emqx_ds_replication_layer_egress_flush,
|