فهرست منبع

Merge remote-tracking branch 'origin/release-50' into 0508-prepare-for-e5.0.4

Zaiming (Stone) Shi 2 سال پیش
والد
کامیت
13dcb5732f
1فایلهای تغییر یافته به همراه19 افزوده شده و 9 حذف شده
  1. 19 9
      apps/emqx_resource/src/emqx_resource_buffer_worker.erl

+ 19 - 9
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -1432,16 +1432,16 @@ store_async_worker_reference(InflightTID, Ref, WorkerMRef) when
 ack_inflight(undefined, _Ref, _Id, _Index) ->
     false;
 ack_inflight(InflightTID, Ref, Id, Index) ->
-    Count =
+    {Count, Removed} =
         case ets:take(InflightTID, Ref) of
             [?INFLIGHT_ITEM(Ref, ?QUERY(_, _, _, _), _IsRetriable, _WorkerMRef)] ->
-                1;
+                {1, true};
             [?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _, _) | _] = Batch, _IsRetriable, _WorkerMRef)] ->
-                length(Batch);
+                {length(Batch), true};
             [] ->
-                0
+                {0, false}
         end,
-    ok = dec_inflight(InflightTID, Count),
+    ok = dec_inflight_remove(InflightTID, Count, Removed),
     IsKnownRef = (Count > 0),
     case IsKnownRef of
         true ->
@@ -1469,18 +1469,28 @@ mark_inflight_items_as_retriable(Data, WorkerMRef) ->
 %% used to update a batch after dropping expired individual queries.
 update_inflight_item(InflightTID, Ref, NewBatch, NumExpired) ->
     _ = ets:update_element(InflightTID, Ref, {?ITEM_IDX, NewBatch}),
-    ok = dec_inflight(InflightTID, NumExpired).
+    ok = dec_inflight_update(InflightTID, NumExpired).
 
 inc_inflight(InflightTID, Count) ->
     _ = ets:update_counter(InflightTID, ?SIZE_REF, {2, Count}),
     _ = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, 1}),
     ok.
 
-dec_inflight(_InflightTID, 0) ->
+dec_inflight_remove(_InflightTID, _Count = 0, _Removed = false) ->
     ok;
-dec_inflight(InflightTID, Count) when Count > 0 ->
-    _ = ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}),
+dec_inflight_remove(InflightTID, _Count = 0, _Removed = true) ->
+    _ = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, -1, 0, 0}),
+    ok;
+dec_inflight_remove(InflightTID, Count, _Removed = true) when Count > 0 ->
+    %% If Count > 0, it must have been removed
     _ = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, -1, 0, 0}),
+    _ = ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}),
+    ok.
+
+dec_inflight_update(_InflightTID, _Count = 0) ->
+    ok;
+dec_inflight_update(InflightTID, Count) when Count > 0 ->
+    _ = ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}),
     ok.
 
 %%==============================================================================