|
|
@@ -482,14 +482,16 @@ flush(Data0) ->
|
|
|
Data1 = cancel_flush_timer(Data0),
|
|
|
CurrentCount = queue_count(Q0),
|
|
|
IsFull = is_inflight_full(InflightTID),
|
|
|
- ?tp(buffer_worker_flush, #{
|
|
|
+ ?tp_ignore_side_effects_in_prod(buffer_worker_flush, #{
|
|
|
queued => CurrentCount,
|
|
|
is_inflight_full => IsFull,
|
|
|
inflight => inflight_count(InflightTID)
|
|
|
}),
|
|
|
case {CurrentCount, IsFull} of
|
|
|
{0, _} ->
|
|
|
- ?tp(buffer_worker_queue_drained, #{inflight => inflight_count(InflightTID)}),
|
|
|
+ ?tp_ignore_side_effects_in_prod(buffer_worker_queue_drained, #{
|
|
|
+ inflight => inflight_count(InflightTID)
|
|
|
+ }),
|
|
|
{keep_state, Data1};
|
|
|
{_, true} ->
|
|
|
?tp(buffer_worker_flush_but_inflight_full, #{}),
|
|
|
@@ -620,7 +622,7 @@ do_flush(
|
|
|
}),
|
|
|
flush_worker(self());
|
|
|
false ->
|
|
|
- ?tp(buffer_worker_queue_drained, #{
|
|
|
+ ?tp_ignore_side_effects_in_prod(buffer_worker_queue_drained, #{
|
|
|
inflight => inflight_count(InflightTID)
|
|
|
}),
|
|
|
ok
|
|
|
@@ -701,7 +703,7 @@ do_flush(#{queue := Q1} = Data0, #{
|
|
|
Data2 =
|
|
|
case {CurrentCount > 0, CurrentCount >= BatchSize} of
|
|
|
{false, _} ->
|
|
|
- ?tp(buffer_worker_queue_drained, #{
|
|
|
+ ?tp_ignore_side_effects_in_prod(buffer_worker_queue_drained, #{
|
|
|
inflight => inflight_count(InflightTID)
|
|
|
}),
|
|
|
Data1;
|
|
|
@@ -1279,13 +1281,10 @@ append_queue(Id, Index, Q, Queries) ->
|
|
|
%% the inflight queue for async query
|
|
|
-define(MAX_SIZE_REF, max_size).
|
|
|
-define(SIZE_REF, size).
|
|
|
+-define(BATCH_COUNT_REF, batch_count).
|
|
|
-define(INITIAL_TIME_REF, initial_time).
|
|
|
-define(INITIAL_MONOTONIC_TIME_REF, initial_monotonic_time).
|
|
|
|
|
|
-%% NOTE
|
|
|
-%% There are 4 metadata rows in an inflight table, keyed by atoms declared above. ☝
|
|
|
--define(INFLIGHT_META_ROWS, 4).
|
|
|
-
|
|
|
inflight_new(InfltWinSZ, Id, Index) ->
|
|
|
TableId = ets:new(
|
|
|
emqx_resource_buffer_worker_inflight_tab,
|
|
|
@@ -1295,6 +1294,7 @@ inflight_new(InfltWinSZ, Id, Index) ->
|
|
|
%% we use this counter because we might deal with batches as
|
|
|
%% elements.
|
|
|
inflight_append(TableId, {?SIZE_REF, 0}, Id, Index),
|
|
|
+ inflight_append(TableId, {?BATCH_COUNT_REF, 0}, Id, Index),
|
|
|
inflight_append(TableId, {?INITIAL_TIME_REF, erlang:system_time()}, Id, Index),
|
|
|
inflight_append(
|
|
|
TableId, {?INITIAL_MONOTONIC_TIME_REF, make_request_ref()}, Id, Index
|
|
|
@@ -1344,10 +1344,7 @@ is_inflight_full(InflightTID) ->
|
|
|
Size >= MaxSize.
|
|
|
|
|
|
inflight_count(InflightTID) ->
|
|
|
- case ets:info(InflightTID, size) of
|
|
|
- undefined -> 0;
|
|
|
- Size -> max(0, Size - ?INFLIGHT_META_ROWS)
|
|
|
- end.
|
|
|
+ emqx_utils_ets:lookup_value(InflightTID, ?BATCH_COUNT_REF, 0).
|
|
|
|
|
|
inflight_num_msgs(InflightTID) ->
|
|
|
[{_, Size}] = ets:lookup(InflightTID, ?SIZE_REF),
|
|
|
@@ -1476,12 +1473,14 @@ update_inflight_item(InflightTID, Ref, NewBatch, NumExpired) ->
|
|
|
|
|
|
inc_inflight(InflightTID, Count) ->
|
|
|
_ = ets:update_counter(InflightTID, ?SIZE_REF, {2, Count}),
|
|
|
+ _ = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, 1}),
|
|
|
ok.
|
|
|
|
|
|
dec_inflight(_InflightTID, 0) ->
|
|
|
ok;
|
|
|
dec_inflight(InflightTID, Count) when Count > 0 ->
|
|
|
_ = ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}),
|
|
|
+ _ = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, -1, 0, 0}),
|
|
|
ok.
|
|
|
|
|
|
%%==============================================================================
|