Quellcode durchsuchen

Merge pull request #10020 from zmstone/0222-fix-bridge-async-mode-counters

fix(bridge): fix dropped counter and inflight gauge
Zaiming (Stone) Shi vor 3 Jahren
Ursprung
Commit
79bf77c2f1

+ 1 - 1
.ci/docker-compose-file/docker-compose-kafka.yaml

@@ -2,7 +2,7 @@ version: '3.9'
 
 services:
   zookeeper:
-    image: wurstmeister/zookeeper
+    image: docker.io/library/zookeeper:3.6
     ports:
       - "2181:2181"
     container_name: zookeeper

+ 1 - 1
apps/emqx/src/emqx_misc.erl

@@ -720,4 +720,4 @@ pub_props_to_packet(Properties) ->
 safe_filename(Filename) when is_binary(Filename) ->
     binary:replace(Filename, <<":">>, <<"-">>, [global]);
 safe_filename(Filename) when is_list(Filename) ->
-    string:replace(Filename, ":", "-", all).
+    lists:flatten(string:replace(Filename, ":", "-", all)).

+ 221 - 90
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -70,6 +70,18 @@
 -define(RETRY_IDX, 3).
 -define(WORKER_MREF_IDX, 4).
 
+-define(ENSURE_ASYNC_FLUSH(InflightTID, EXPR),
+    (fun() ->
+        IsFullBefore = is_inflight_full(InflightTID),
+        case (EXPR) of
+            blocked ->
+                ok;
+            ok ->
+                ok = maybe_flush_after_async_reply(IsFullBefore)
+        end
+    end)()
+).
+
 -type id() :: binary().
 -type index() :: pos_integer().
 -type expire_at() :: infinity | integer().
@@ -97,6 +109,7 @@ start_link(Id, Index, Opts) ->
 
 -spec sync_query(id(), request(), query_opts()) -> Result :: term().
 sync_query(Id, Request, Opts0) ->
+    ?tp(sync_query, #{id => Id, request => Request, query_opts => Opts0}),
     Opts1 = ensure_timeout_query_opts(Opts0, sync),
     Opts = ensure_expire_at(Opts1),
     PickKey = maps:get(pick_key, Opts, self()),
@@ -106,6 +119,7 @@ sync_query(Id, Request, Opts0) ->
 
 -spec async_query(id(), request(), query_opts()) -> Result :: term().
 async_query(Id, Request, Opts0) ->
+    ?tp(async_query, #{id => Id, request => Request, query_opts => Opts0}),
     Opts1 = ensure_timeout_query_opts(Opts0, async),
     Opts = ensure_expire_at(Opts1),
     PickKey = maps:get(pick_key, Opts, self()),
@@ -121,6 +135,7 @@ simple_sync_query(Id, Request) ->
     %% call ends up calling buffering functions, that's a bug and
     %% would mess up the metrics anyway.  `undefined' is ignored by
     %% `emqx_resource_metrics:*_shift/3'.
+    ?tp(simple_sync_query, #{id => Id, request => Request}),
     Index = undefined,
     QueryOpts = simple_query_opts(),
     emqx_resource_metrics:matched_inc(Id),
@@ -132,6 +147,7 @@ simple_sync_query(Id, Request) ->
 %% simple async-query the resource without batching and queuing.
 -spec simple_async_query(id(), request(), query_opts()) -> term().
 simple_async_query(Id, Request, QueryOpts0) ->
+    ?tp(simple_async_query, #{id => Id, request => Request, query_opts => QueryOpts0}),
     Index = undefined,
     QueryOpts = maps:merge(simple_query_opts(), QueryOpts0),
     emqx_resource_metrics:matched_inc(Id),
@@ -194,8 +210,8 @@ init({Id, Index, Opts}) ->
     ?tp(buffer_worker_init, #{id => Id, index => Index}),
     {ok, running, Data}.
 
-running(enter, _, Data) ->
-    ?tp(buffer_worker_enter_running, #{id => maps:get(id, Data)}),
+running(enter, _, #{tref := _Tref} = Data) ->
+    ?tp(buffer_worker_enter_running, #{id => maps:get(id, Data), tref => _Tref}),
     %% According to `gen_statem' laws, we mustn't call `maybe_flush'
     %% directly because it may decide to return `{next_state, blocked, _}',
     %% and that's an invalid response for a state enter call.
@@ -212,9 +228,8 @@ running(info, ?SEND_REQ(_ReplyTo, _Req) = Request0, Data) ->
     handle_query_requests(Request0, Data);
 running(info, {flush, Ref}, St = #{tref := {_TRef, Ref}}) ->
     flush(St#{tref := undefined});
-running(internal, flush, St) ->
-    flush(St);
 running(info, {flush, _Ref}, _St) ->
+    ?tp(discarded_stale_flush, #{}),
     keep_state_and_data;
 running(info, {'DOWN', _MRef, process, Pid, Reason}, Data0 = #{async_workers := AsyncWorkers0}) when
     is_map_key(Pid, AsyncWorkers0)
@@ -225,21 +240,24 @@ running(info, Info, _St) ->
     ?SLOG(error, #{msg => unexpected_msg, state => running, info => Info}),
     keep_state_and_data.
 
-blocked(enter, _, #{resume_interval := ResumeT} = _St) ->
+blocked(enter, _, #{resume_interval := ResumeT} = St0) ->
     ?tp(buffer_worker_enter_blocked, #{}),
-    {keep_state_and_data, {state_timeout, ResumeT, unblock}};
+    %% discard the old timer, new timer will be started when entering running state again
+    St = cancel_flush_timer(St0),
+    {keep_state, St, {state_timeout, ResumeT, unblock}};
 blocked(cast, block, _St) ->
     keep_state_and_data;
 blocked(cast, resume, St) ->
     resume_from_blocked(St);
-blocked(cast, flush, Data) ->
-    resume_from_blocked(Data);
+blocked(cast, flush, St) ->
+    resume_from_blocked(St);
 blocked(state_timeout, unblock, St) ->
     resume_from_blocked(St);
 blocked(info, ?SEND_REQ(_ReplyTo, _Req) = Request0, Data0) ->
     Data = collect_and_enqueue_query_requests(Request0, Data0),
     {keep_state, Data};
 blocked(info, {flush, _Ref}, _Data) ->
+    %% ignore stale timer
     keep_state_and_data;
 blocked(info, {'DOWN', _MRef, process, Pid, Reason}, Data0 = #{async_workers := AsyncWorkers0}) when
     is_map_key(Pid, AsyncWorkers0)
@@ -335,11 +353,13 @@ resume_from_blocked(Data) ->
             %% We retry msgs in inflight window sync, as if we send them
             %% async, they will be appended to the end of inflight window again.
             retry_inflight_sync(Ref, Query, Data);
+        {batch, Ref, NotExpired, []} ->
+            retry_inflight_sync(Ref, NotExpired, Data);
         {batch, Ref, NotExpired, Expired} ->
-            update_inflight_item(InflightTID, Ref, NotExpired),
             NumExpired = length(Expired),
+            ok = update_inflight_item(InflightTID, Ref, NotExpired, NumExpired),
             emqx_resource_metrics:dropped_expired_inc(Id, NumExpired),
-            NumExpired > 0 andalso ?tp(buffer_worker_retry_expired, #{expired => Expired}),
+            ?tp(buffer_worker_retry_expired, #{expired => Expired}),
             %% We retry msgs in inflight window sync, as if we send them
             %% async, they will be appended to the end of inflight window again.
             retry_inflight_sync(Ref, NotExpired, Data)
@@ -470,9 +490,14 @@ flush(Data0) ->
     Data1 = cancel_flush_timer(Data0),
     CurrentCount = queue_count(Q0),
     IsFull = is_inflight_full(InflightTID),
-    ?tp(buffer_worker_flush, #{queue_count => CurrentCount, is_full => IsFull}),
+    ?tp(buffer_worker_flush, #{
+        queued => CurrentCount,
+        is_inflight_full => IsFull,
+        inflight => inflight_count(InflightTID)
+    }),
     case {CurrentCount, IsFull} of
         {0, _} ->
+            ?tp(buffer_worker_queue_drained, #{inflight => inflight_count(InflightTID)}),
             {keep_state, Data1};
         {_, true} ->
             ?tp(buffer_worker_flush_but_inflight_full, #{}),
@@ -487,7 +512,7 @@ flush(Data0) ->
             %% if the request has expired, the caller is no longer
             %% waiting for a response.
             case sieve_expired_requests(Batch, Now) of
-                all_expired ->
+                {[], _AllExpired} ->
                     ok = replayq:ack(Q1, QAckRef),
                     emqx_resource_metrics:dropped_expired_inc(Id, length(Batch)),
                     emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
@@ -496,7 +521,7 @@ flush(Data0) ->
                 {NotExpired, Expired} ->
                     NumExpired = length(Expired),
                     emqx_resource_metrics:dropped_expired_inc(Id, NumExpired),
-                    IsBatch = BatchSize =/= 1,
+                    IsBatch = (BatchSize > 1),
                     %% We *must* use the new queue, because we currently can't
                     %% `nack' a `pop'.
                     %% Maybe we could re-open the queue?
@@ -506,7 +531,6 @@ flush(Data0) ->
                     ),
                     Ref = make_request_ref(),
                     do_flush(Data2, #{
-                        new_queue => Q1,
                         is_batch => IsBatch,
                         batch => NotExpired,
                         ref => Ref,
@@ -519,18 +543,16 @@ flush(Data0) ->
     is_batch := boolean(),
     batch := [queue_query()],
     ack_ref := replayq:ack_ref(),
-    ref := inflight_key(),
-    new_queue := replayq:q()
+    ref := inflight_key()
 }) ->
     gen_statem:event_handler_result(state(), data()).
 do_flush(
-    Data0,
+    #{queue := Q1} = Data0,
     #{
         is_batch := false,
         batch := Batch,
         ref := Ref,
-        ack_ref := QAckRef,
-        new_queue := Q1
+        ack_ref := QAckRef
     }
 ) ->
     #{
@@ -606,16 +628,18 @@ do_flush(
                     }),
                     flush_worker(self());
                 false ->
+                    ?tp(buffer_worker_queue_drained, #{
+                        inflight => inflight_count(InflightTID)
+                    }),
                     ok
             end,
             {keep_state, Data1}
     end;
-do_flush(Data0, #{
+do_flush(#{queue := Q1} = Data0, #{
     is_batch := true,
     batch := Batch,
     ref := Ref,
-    ack_ref := QAckRef,
-    new_queue := Q1
+    ack_ref := QAckRef
 }) ->
     #{
         id := Id,
@@ -685,6 +709,9 @@ do_flush(Data0, #{
             Data2 =
                 case {CurrentCount > 0, CurrentCount >= BatchSize} of
                     {false, _} ->
+                        ?tp(buffer_worker_queue_drained, #{
+                            inflight => inflight_count(InflightTID)
+                        }),
                         Data1;
                     {true, true} ->
                         ?tp(buffer_worker_flush_ack_reflush, #{
@@ -718,13 +745,14 @@ batch_reply_caller_defer_metrics(Id, BatchResult, Batch, QueryOpts) ->
     {ShouldAck, PostFns} =
         lists:foldl(
             fun(Reply, {_ShouldAck, PostFns}) ->
+                %% _ShouldAck should be the same as ShouldAck starting from the second reply
                 {ShouldAck, PostFn} = reply_caller_defer_metrics(Id, Reply, QueryOpts),
                 {ShouldAck, [PostFn | PostFns]}
             end,
             {ack, []},
             Replies
         ),
-    PostFn = fun() -> lists:foreach(fun(F) -> F() end, PostFns) end,
+    PostFn = fun() -> lists:foreach(fun(F) -> F() end, lists:reverse(PostFns)) end,
     {ShouldAck, PostFn}.
 
 reply_caller(Id, Reply, QueryOpts) ->
@@ -853,7 +881,7 @@ handle_async_worker_down(Data0, Pid) ->
     {keep_state, Data}.
 
 call_query(QM0, Id, Index, Ref, Query, QueryOpts) ->
-    ?tp(call_query_enter, #{id => Id, query => Query}),
+    ?tp(call_query_enter, #{id => Id, query => Query, query_mode => QM0}),
     case emqx_resource_manager:ets_lookup(Id) of
         {ok, _Group, #{status := stopped}} ->
             ?RESOURCE_ERROR(stopped, "resource stopped or disabled");
@@ -919,7 +947,7 @@ apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _, _) = Query, Re
                 inflight_tid => InflightTID,
                 request_ref => Ref,
                 query_opts => QueryOpts,
-                query => minimize(Query)
+                min_query => minimize(Query)
             },
             IsRetriable = false,
             WorkerMRef = undefined,
@@ -952,7 +980,7 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _, _) | _] = Batch, Re
                 inflight_tid => InflightTID,
                 request_ref => Ref,
                 query_opts => QueryOpts,
-                batch => minimize(Batch)
+                min_batch => minimize(Batch)
             },
             Requests = lists:map(
                 fun(?QUERY(_ReplyTo, Request, _, _ExpireAt)) -> Request end, Batch
@@ -968,27 +996,39 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _, _) | _] = Batch, Re
     ).
 
 handle_async_reply(
+    #{
+        request_ref := Ref,
+        inflight_tid := InflightTID,
+        query_opts := Opts
+    } = ReplyContext,
+    Result
+) ->
+    case maybe_handle_unknown_async_reply(InflightTID, Ref, Opts) of
+        discard ->
+            ok;
+        continue ->
+            ?ENSURE_ASYNC_FLUSH(InflightTID, handle_async_reply1(ReplyContext, Result))
+    end.
+
+handle_async_reply1(
     #{
         request_ref := Ref,
         inflight_tid := InflightTID,
         resource_id := Id,
         worker_index := Index,
-        buffer_worker := Pid,
-        query := ?QUERY(_, _, _, ExpireAt) = _Query
+        min_query := ?QUERY(_, _, _, ExpireAt) = _Query
     } = ReplyContext,
     Result
 ) ->
     ?tp(
         handle_async_reply_enter,
-        #{batch_or_query => [_Query], ref => Ref}
+        #{batch_or_query => [_Query], ref => Ref, result => Result}
     ),
     Now = now_(),
     case is_expired(ExpireAt, Now) of
         true ->
-            IsFullBefore = is_inflight_full(InflightTID),
             IsAcked = ack_inflight(InflightTID, Ref, Id, Index),
             IsAcked andalso emqx_resource_metrics:late_reply_inc(Id),
-            IsFullBefore andalso ?MODULE:flush_worker(Pid),
             ?tp(handle_async_reply_expired, #{expired => [_Query]}),
             ok;
         false ->
@@ -1003,7 +1043,7 @@ do_handle_async_reply(
         worker_index := Index,
         buffer_worker := Pid,
         inflight_tid := InflightTID,
-        query := ?QUERY(ReplyTo, _, Sent, _ExpireAt) = _Query
+        min_query := ?QUERY(ReplyTo, _, Sent, _ExpireAt) = _Query
     },
     Result
 ) ->
@@ -1020,46 +1060,95 @@ do_handle_async_reply(
         ref => Ref,
         result => Result
     }),
-
     case Action of
         nack ->
             %% Keep retrying.
-            mark_inflight_as_retriable(InflightTID, Ref),
-            ?MODULE:block(Pid);
+            ok = mark_inflight_as_retriable(InflightTID, Ref),
+            ok = ?MODULE:block(Pid),
+            blocked;
         ack ->
-            do_ack(InflightTID, Ref, Id, Index, PostFn, Pid, QueryOpts)
+            ok = do_async_ack(InflightTID, Ref, Id, Index, PostFn, QueryOpts)
     end.
 
 handle_async_batch_reply(
     #{
-        buffer_worker := Pid,
-        resource_id := Id,
-        worker_index := Index,
         inflight_tid := InflightTID,
         request_ref := Ref,
-        batch := Batch
+        query_opts := Opts
+    } = ReplyContext,
+    Result
+) ->
+    case maybe_handle_unknown_async_reply(InflightTID, Ref, Opts) of
+        discard ->
+            ok;
+        continue ->
+            ?ENSURE_ASYNC_FLUSH(InflightTID, handle_async_batch_reply1(ReplyContext, Result))
+    end.
+
+handle_async_batch_reply1(
+    #{
+        inflight_tid := InflightTID,
+        request_ref := Ref,
+        min_batch := Batch
     } = ReplyContext,
     Result
 ) ->
     ?tp(
         handle_async_reply_enter,
-        #{batch_or_query => Batch, ref => Ref}
+        #{batch_or_query => Batch, ref => Ref, result => Result}
     ),
     Now = now_(),
     case sieve_expired_requests(Batch, Now) of
-        all_expired ->
-            IsFullBefore = is_inflight_full(InflightTID),
-            IsAcked = ack_inflight(InflightTID, Ref, Id, Index),
-            IsAcked andalso emqx_resource_metrics:late_reply_inc(Id),
-            IsFullBefore andalso ?MODULE:flush_worker(Pid),
-            ?tp(handle_async_reply_expired, #{expired => Batch}),
+        {_NotExpired, []} ->
+            %% this is the critical code path,
+            %% we try not to do ets:lookup in this case
+            %% because the batch can be quite big
+            do_handle_async_batch_reply(ReplyContext, Result);
+        {_NotExpired, _Expired} ->
+            %% at least one is expired
+            %% the batch from reply context is minimized, so it cannot be used
+            %% to update the inflight items, hence discard Batch and lookup the RealBatch
+            ?tp(handle_async_reply_expired, #{expired => _Expired}),
+            handle_async_batch_reply2(ets:lookup(InflightTID, Ref), ReplyContext, Result, Now)
+    end.
+
+handle_async_batch_reply2([], _, _, _) ->
+    %% this usually should never happen unless the async callback is being evaluated concurrently
+    ok;
+handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) ->
+    ?INFLIGHT_ITEM(_, RealBatch, _IsRetriable, _WorkerMRef) = Inflight,
+    #{
+        resource_id := Id,
+        worker_index := Index,
+        inflight_tid := InflightTID,
+        request_ref := Ref,
+        min_batch := Batch
+    } = ReplyContext,
+    %% All batch items share the same HasBeenSent flag
+    %% So we just take the original flag from the ReplyContext batch
+    %% and put it back to the batch found in inflight table
+    %% which must have already been set to `false`
+    [?QUERY(_ReplyTo, _, HasBeenSent, _ExpireAt) | _] = Batch,
+    {RealNotExpired0, RealExpired} = sieve_expired_requests(RealBatch, Now),
+    RealNotExpired =
+        lists:map(
+            fun(?QUERY(ReplyTo, CoreReq, _HasBeenSent, ExpireAt)) ->
+                ?QUERY(ReplyTo, CoreReq, HasBeenSent, ExpireAt)
+            end,
+            RealNotExpired0
+        ),
+    NumExpired = length(RealExpired),
+    emqx_resource_metrics:late_reply_inc(Id, NumExpired),
+    case RealNotExpired of
+        [] ->
+            %% all expired, no need to update back the inflight batch
+            _ = ack_inflight(InflightTID, Ref, Id, Index),
             ok;
-        {NotExpired, Expired} ->
-            NumExpired = length(Expired),
-            emqx_resource_metrics:late_reply_inc(Id, NumExpired),
-            NumExpired > 0 andalso
-                ?tp(handle_async_reply_expired, #{expired => Expired}),
-            do_handle_async_batch_reply(ReplyContext#{batch := NotExpired}, Result)
+        _ ->
+            %% some queries are not expired, put them back to the inflight batch
+            %% so it can be either acked now or retried later
+            ok = update_inflight_item(InflightTID, Ref, RealNotExpired, NumExpired),
+            do_handle_async_batch_reply(ReplyContext#{min_batch := RealNotExpired}, Result)
     end.
 
 do_handle_async_batch_reply(
@@ -1069,7 +1158,7 @@ do_handle_async_batch_reply(
         worker_index := Index,
         inflight_tid := InflightTID,
         request_ref := Ref,
-        batch := Batch,
+        min_batch := Batch,
         query_opts := QueryOpts
     },
     Result
@@ -1084,14 +1173,14 @@ do_handle_async_batch_reply(
     case Action of
         nack ->
             %% Keep retrying.
-            mark_inflight_as_retriable(InflightTID, Ref),
-            ?MODULE:block(Pid);
+            ok = mark_inflight_as_retriable(InflightTID, Ref),
+            ok = ?MODULE:block(Pid),
+            blocked;
         ack ->
-            do_ack(InflightTID, Ref, Id, Index, PostFn, Pid, QueryOpts)
+            ok = do_async_ack(InflightTID, Ref, Id, Index, PostFn, QueryOpts)
     end.
 
-do_ack(InflightTID, Ref, Id, Index, PostFn, WorkerPid, QueryOpts) ->
-    IsFullBefore = is_inflight_full(InflightTID),
+do_async_ack(InflightTID, Ref, Id, Index, PostFn, QueryOpts) ->
     IsKnownRef = ack_inflight(InflightTID, Ref, Id, Index),
     case maps:get(simple_query, QueryOpts, false) of
         true ->
@@ -1101,9 +1190,47 @@ do_ack(InflightTID, Ref, Id, Index, PostFn, WorkerPid, QueryOpts) ->
         false ->
             ok
     end,
-    IsFullBefore andalso ?MODULE:flush_worker(WorkerPid),
     ok.
 
+maybe_flush_after_async_reply(_WasFullBeforeReplyHandled = false) ->
+    %% inflight was not full before async reply is handled,
+    %% after it is handled, the inflight table must be even smaller
+    %% hance we can rely on the buffer worker's flush timer to trigger
+    %% the next flush
+    ?tp(skip_flushing_worker, #{}),
+    ok;
+maybe_flush_after_async_reply(_WasFullBeforeReplyHandled = true) ->
+    %% the inflight table was full before handling aync reply
+    ?tp(do_flushing_worker, #{}),
+    ok = ?MODULE:flush_worker(self()).
+
+%% check if the async reply is valid.
+%% e.g. if a connector evaluates the callback more than once:
+%% 1. If the request was previously deleted from inflight table due to
+%%    either succeeded previously or expired, this function logs a
+%%    warning message and returns 'discard' instruction.
+%% 2. If the request was previously failed and now pending on a retry,
+%%    then this function will return 'continue' as there is no way to
+%%    tell if this reply is stae or not.
+maybe_handle_unknown_async_reply(undefined, _Ref, #{simple_query := true}) ->
+    continue;
+maybe_handle_unknown_async_reply(InflightTID, Ref, #{}) ->
+    try ets:member(InflightTID, Ref) of
+        true ->
+            continue;
+        false ->
+            ?tp(
+                warning,
+                unknown_async_reply_discarded,
+                #{inflight_key => Ref}
+            ),
+            discard
+    catch
+        error:badarg ->
+            %% shutdown ?
+            discard
+    end.
+
 %%==============================================================================
 %% operations for queue
 queue_item_marshaller(Bin) when is_binary(Bin) ->
@@ -1202,10 +1329,8 @@ inflight_get_first_retriable(InflightTID, Now) ->
                     {single, Ref, Query}
             end;
         {[{Ref, Batch = [_ | _]}], _Continuation} ->
-            %% batch is non-empty because we check that in
-            %% `sieve_expired_requests'.
             case sieve_expired_requests(Batch, Now) of
-                all_expired ->
+                {[], _AllExpired} ->
                     {expired, Ref, Batch};
                 {NotExpired, Expired} ->
                     {batch, Ref, NotExpired, Expired}
@@ -1218,10 +1343,10 @@ is_inflight_full(InflightTID) ->
     [{_, MaxSize}] = ets:lookup(InflightTID, ?MAX_SIZE_REF),
     %% we consider number of batches rather than number of messages
     %% because one batch request may hold several messages.
-    Size = inflight_num_batches(InflightTID),
+    Size = inflight_count(InflightTID),
     Size >= MaxSize.
 
-inflight_num_batches(InflightTID) ->
+inflight_count(InflightTID) ->
     case ets:info(InflightTID, size) of
         undefined -> 0;
         Size -> max(0, Size - ?INFLIGHT_META_ROWS)
@@ -1243,7 +1368,7 @@ inflight_append(
     InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef),
     IsNew = ets:insert_new(InflightTID, InflightItem),
     BatchSize = length(Batch),
-    IsNew andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, BatchSize}),
+    IsNew andalso inc_inflight(InflightTID, BatchSize),
     emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)),
     ?tp(buffer_worker_appended_to_inflight, #{item => InflightItem, is_new => IsNew}),
     ok;
@@ -1258,7 +1383,7 @@ inflight_append(
     Query = mark_as_sent(Query0),
     InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, WorkerMRef),
     IsNew = ets:insert_new(InflightTID, InflightItem),
-    IsNew andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, 1}),
+    IsNew andalso inc_inflight(InflightTID, 1),
     emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)),
     ?tp(buffer_worker_appended_to_inflight, #{item => InflightItem, is_new => IsNew}),
     ok;
@@ -1274,6 +1399,8 @@ mark_inflight_as_retriable(undefined, _Ref) ->
     ok;
 mark_inflight_as_retriable(InflightTID, Ref) ->
     _ = ets:update_element(InflightTID, Ref, {?RETRY_IDX, true}),
+    %% the old worker's DOWN should not affect this inflight any more
+    _ = ets:update_element(InflightTID, Ref, {?WORKER_MREF_IDX, erased}),
     ok.
 
 %% Track each worker pid only once.
@@ -1317,13 +1444,18 @@ ack_inflight(InflightTID, Ref, Id, Index) ->
                 1;
             [?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _, _) | _] = Batch, _IsRetriable, _WorkerMRef)] ->
                 length(Batch);
-            _ ->
+            [] ->
                 0
         end,
-    IsAcked = Count > 0,
-    IsAcked andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}),
-    emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)),
-    IsAcked.
+    ok = dec_inflight(InflightTID, Count),
+    IsKnownRef = (Count > 0),
+    case IsKnownRef of
+        true ->
+            emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID));
+        false ->
+            ok
+    end,
+    IsKnownRef.
 
 mark_inflight_items_as_retriable(Data, WorkerMRef) ->
     #{inflight_tid := InflightTID} = Data,
@@ -1341,9 +1473,18 @@ mark_inflight_items_as_retriable(Data, WorkerMRef) ->
     ok.
 
 %% used to update a batch after dropping expired individual queries.
-update_inflight_item(InflightTID, Ref, NewBatch) ->
+update_inflight_item(InflightTID, Ref, NewBatch, NumExpired) ->
     _ = ets:update_element(InflightTID, Ref, {?ITEM_IDX, NewBatch}),
-    ?tp(buffer_worker_worker_update_inflight_item, #{ref => Ref}),
+    ok = dec_inflight(InflightTID, NumExpired).
+
+inc_inflight(InflightTID, Count) ->
+    _ = ets:update_counter(InflightTID, ?SIZE_REF, {2, Count}),
+    ok.
+
+dec_inflight(_InflightTID, 0) ->
+    ok;
+dec_inflight(InflightTID, Count) when Count > 0 ->
+    _ = ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}),
     ok.
 
 %%==============================================================================
@@ -1453,22 +1594,12 @@ is_async_return(_) ->
     false.
 
 sieve_expired_requests(Batch, Now) ->
-    {Expired, NotExpired} =
-        lists:partition(
-            fun(?QUERY(_ReplyTo, _CoreReq, _HasBeenSent, ExpireAt)) ->
-                is_expired(ExpireAt, Now)
-            end,
-            Batch
-        ),
-    case {NotExpired, Expired} of
-        {[], []} ->
-            %% Should be impossible for batch_size >= 1.
-            all_expired;
-        {[], [_ | _]} ->
-            all_expired;
-        {[_ | _], _} ->
-            {NotExpired, Expired}
-    end.
+    lists:partition(
+        fun(?QUERY(_ReplyTo, _CoreReq, _HasBeenSent, ExpireAt)) ->
+            not is_expired(ExpireAt, Now)
+        end,
+        Batch
+    ).
 
 -spec is_expired(infinity | integer(), integer()) -> boolean().
 is_expired(infinity = _ExpireAt, _Now) ->

+ 49 - 10
apps/emqx_resource/test/emqx_connector_demo.erl

@@ -135,11 +135,11 @@ on_query(_InstId, get_counter, #{pid := Pid}) ->
     after 1000 ->
         {error, timeout}
     end;
-on_query(_InstId, {sleep, For}, #{pid := Pid}) ->
+on_query(_InstId, {sleep_before_reply, For}, #{pid := Pid}) ->
     ?tp(connector_demo_sleep, #{mode => sync, for => For}),
     ReqRef = make_ref(),
     From = {self(), ReqRef},
-    Pid ! {From, {sleep, For}},
+    Pid ! {From, {sleep_before_reply, For}},
     receive
         {ReqRef, Result} ->
             Result
@@ -159,9 +159,9 @@ on_query_async(_InstId, block_now, ReplyFun, #{pid := Pid}) ->
 on_query_async(_InstId, {big_payload, Payload}, ReplyFun, #{pid := Pid}) ->
     Pid ! {big_payload, Payload, ReplyFun},
     {ok, Pid};
-on_query_async(_InstId, {sleep, For}, ReplyFun, #{pid := Pid}) ->
+on_query_async(_InstId, {sleep_before_reply, For}, ReplyFun, #{pid := Pid}) ->
     ?tp(connector_demo_sleep, #{mode => async, for => For}),
-    Pid ! {{sleep, For}, ReplyFun},
+    Pid ! {{sleep_before_reply, For}, ReplyFun},
     {ok, Pid}.
 
 on_batch_query(InstId, BatchReq, State) ->
@@ -173,10 +173,13 @@ on_batch_query(InstId, BatchReq, State) ->
         get_counter ->
             batch_get_counter(sync, InstId, State);
         {big_payload, _Payload} ->
-            batch_big_payload(sync, InstId, BatchReq, State)
+            batch_big_payload(sync, InstId, BatchReq, State);
+        {random_reply, Num} ->
+            %% async batch retried
+            make_random_reply(Num)
     end.
 
-on_batch_query_async(InstId, BatchReq, ReplyFunAndArgs, State) ->
+on_batch_query_async(InstId, BatchReq, ReplyFunAndArgs, #{pid := Pid} = State) ->
     %% Requests can be of multiple types, but cannot be mixed.
     case hd(BatchReq) of
         {inc_counter, _} ->
@@ -186,7 +189,11 @@ on_batch_query_async(InstId, BatchReq, ReplyFunAndArgs, State) ->
         block_now ->
             on_query_async(InstId, block_now, ReplyFunAndArgs, State);
         {big_payload, _Payload} ->
-            batch_big_payload({async, ReplyFunAndArgs}, InstId, BatchReq, State)
+            batch_big_payload({async, ReplyFunAndArgs}, InstId, BatchReq, State);
+        {random_reply, Num} ->
+            %% only take the first Num in the batch should be random enough
+            Pid ! {{random_reply, Num}, ReplyFunAndArgs},
+            {ok, Pid}
     end.
 
 batch_inc_counter(CallMode, InstId, BatchReq, State) ->
@@ -299,16 +306,33 @@ counter_loop(
             {{FromPid, ReqRef}, get} ->
                 FromPid ! {ReqRef, Num},
                 State;
-            {{sleep, _} = SleepQ, ReplyFun} ->
+            {{random_reply, RandNum}, ReplyFun} ->
+                %% usually a behaving  connector should reply once and only once for
+                %% each (batch) request
+                %% but we try to reply random results a random number of times
+                %% with 'ok' in the result, the buffer worker should eventually
+                %% drain the buffer (and inflights table)
+                ReplyCount = 1 + (RandNum rem 3),
+                Results = make_random_replies(ReplyCount),
+                %% add a delay to trigger inflight full
+                lists:foreach(
+                    fun(Result) ->
+                        timer:sleep(rand:uniform(5)),
+                        apply_reply(ReplyFun, Result)
+                    end,
+                    Results
+                ),
+                State;
+            {{sleep_before_reply, _} = SleepQ, ReplyFun} ->
                 apply_reply(ReplyFun, handle_query(async, SleepQ, Status)),
                 State;
-            {{FromPid, ReqRef}, {sleep, _} = SleepQ} ->
+            {{FromPid, ReqRef}, {sleep_before_reply, _} = SleepQ} ->
                 FromPid ! {ReqRef, handle_query(sync, SleepQ, Status)},
                 State
         end,
     counter_loop(NewState).
 
-handle_query(Mode, {sleep, For} = Query, Status) ->
+handle_query(Mode, {sleep_before_reply, For} = Query, Status) ->
     ok = timer:sleep(For),
     Result =
         case Status of
@@ -329,3 +353,18 @@ maybe_register(_Name, _Pid, false) ->
 
 apply_reply({ReplyFun, Args}, Result) when is_function(ReplyFun) ->
     apply(ReplyFun, Args ++ [Result]).
+
+make_random_replies(0) ->
+    [];
+make_random_replies(N) ->
+    [make_random_reply(N) | make_random_replies(N - 1)].
+
+make_random_reply(N) ->
+    case rand:uniform(3) of
+        1 ->
+            {ok, N};
+        2 ->
+            {error, {recoverable_error, N}};
+        3 ->
+            {error, {unrecoverable_error, N}}
+    end.

+ 110 - 18
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -1482,7 +1482,7 @@ t_retry_async_inflight_full(_Config) ->
                         AsyncInflightWindow * 2,
                         fun() ->
                             For = (ResumeInterval div 4) + rand:uniform(ResumeInterval div 4),
-                            {sleep, For}
+                            {sleep_before_reply, For}
                         end,
                         #{async_reply_fun => {fun(Res) -> ct:pal("Res = ~p", [Res]) end, []}}
                     ),
@@ -1507,6 +1507,59 @@ t_retry_async_inflight_full(_Config) ->
     ?assertEqual(0, emqx_resource_metrics:inflight_get(?ID)),
     ok.
 
+%% this test case is to ensure the buffer worker will not go crazy even
+%% if the underlying connector is misbehaving: evaluate async callbacks multiple times
+t_async_reply_multi_eval(_Config) ->
+    ResumeInterval = 5,
+    TotalTime = 5_000,
+    AsyncInflightWindow = 3,
+    TotalQueries = AsyncInflightWindow * 5,
+    emqx_connector_demo:set_callback_mode(async_if_possible),
+    {ok, _} = emqx_resource:create(
+        ?ID,
+        ?DEFAULT_RESOURCE_GROUP,
+        ?TEST_RESOURCE,
+        #{name => ?FUNCTION_NAME},
+        #{
+            query_mode => async,
+            async_inflight_window => AsyncInflightWindow,
+            batch_size => 3,
+            batch_time => 10,
+            worker_pool_size => 1,
+            resume_interval => ResumeInterval
+        }
+    ),
+    %% block
+    ok = emqx_resource:simple_sync_query(?ID, block),
+    inc_counter_in_parallel(
+        TotalQueries,
+        fun() ->
+            Rand = rand:uniform(1000),
+            {random_reply, Rand}
+        end,
+        #{}
+    ),
+    ?retry(
+        ResumeInterval,
+        TotalTime div ResumeInterval,
+        begin
+            Metrics = tap_metrics(?LINE),
+            #{
+                counters := Counters,
+                gauges := #{queuing := 0, inflight := 0}
+            } = Metrics,
+            #{
+                matched := Matched,
+                success := Success,
+                dropped := Dropped,
+                late_reply := LateReply,
+                failed := Failed
+            } = Counters,
+            ?assertEqual(TotalQueries, Matched - 1),
+            ?assertEqual(Matched, Success + Dropped + LateReply + Failed)
+        end
+    ).
+
 t_retry_async_inflight_batch(_Config) ->
     ResumeInterval = 1_000,
     emqx_connector_demo:set_callback_mode(async_if_possible),
@@ -1944,7 +1997,7 @@ t_expiration_async_batch_after_reply(_Config) ->
         #{name => test_resource},
         #{
             query_mode => async,
-            batch_size => 2,
+            batch_size => 3,
             batch_time => 100,
             worker_pool_size => 1,
             resume_interval => 2_000
@@ -1959,7 +2012,7 @@ do_t_expiration_async_after_reply(IsBatch) ->
             NAcks =
                 case IsBatch of
                     batch -> 1;
-                    single -> 2
+                    single -> 3
                 end,
             ?force_ordering(
                 #{?snk_kind := buffer_worker_flush_ack},
@@ -1980,6 +2033,10 @@ do_t_expiration_async_after_reply(IsBatch) ->
                 ok,
                 emqx_resource:query(?ID, {inc_counter, 199}, #{timeout => TimeoutMS})
             ),
+            ?assertEqual(
+                ok,
+                emqx_resource:query(?ID, {inc_counter, 299}, #{timeout => TimeoutMS})
+            ),
             ?assertEqual(
                 ok, emqx_resource:query(?ID, {inc_counter, 99}, #{timeout => infinity})
             ),
@@ -1997,30 +2054,44 @@ do_t_expiration_async_after_reply(IsBatch) ->
             {ok, _} = ?block_until(
                 #{?snk_kind := handle_async_reply_expired}, 10 * TimeoutMS
             ),
+            wait_telemetry_event(success, #{n_events => 1, timeout => 4_000}),
 
             unlink(Pid0),
             exit(Pid0, kill),
             ok
         end,
         fun(Trace) ->
-            ?assertMatch(
-                [
-                    #{
-                        expired := [{query, _, {inc_counter, 199}, _, _}]
-                    }
-                ],
-                ?of_kind(handle_async_reply_expired, Trace)
-            ),
-            wait_telemetry_event(success, #{n_events => 1, timeout => 4_000}),
+            case IsBatch of
+                batch ->
+                    ?assertMatch(
+                        [
+                            #{
+                                expired := [
+                                    {query, _, {inc_counter, 199}, _, _},
+                                    {query, _, {inc_counter, 299}, _, _}
+                                ]
+                            }
+                        ],
+                        ?of_kind(handle_async_reply_expired, Trace)
+                    );
+                single ->
+                    ?assertMatch(
+                        [
+                            #{expired := [{query, _, {inc_counter, 199}, _, _}]},
+                            #{expired := [{query, _, {inc_counter, 299}, _, _}]}
+                        ],
+                        ?of_kind(handle_async_reply_expired, Trace)
+                    )
+            end,
             Metrics = tap_metrics(?LINE),
             ?assertMatch(
                 #{
                     counters := #{
-                        matched := 2,
+                        matched := 3,
                         %% the request with infinity timeout.
                         success := 1,
                         dropped := 0,
-                        late_reply := 1,
+                        late_reply := 2,
                         retried := 0,
                         failed := 0
                     }
@@ -2042,7 +2113,7 @@ t_expiration_batch_all_expired_after_reply(_Config) ->
         #{name => test_resource},
         #{
             query_mode => async,
-            batch_size => 2,
+            batch_size => 3,
             batch_time => 100,
             worker_pool_size => 1,
             resume_interval => ResumeInterval
@@ -2067,6 +2138,10 @@ t_expiration_batch_all_expired_after_reply(_Config) ->
                 ok,
                 emqx_resource:query(?ID, {inc_counter, 199}, #{timeout => TimeoutMS})
             ),
+            ?assertEqual(
+                ok,
+                emqx_resource:query(?ID, {inc_counter, 299}, #{timeout => TimeoutMS})
+            ),
             Pid0 =
                 spawn_link(fun() ->
                     ?tp(delay_enter, #{}),
@@ -2087,7 +2162,10 @@ t_expiration_batch_all_expired_after_reply(_Config) ->
             ?assertMatch(
                 [
                     #{
-                        expired := [{query, _, {inc_counter, 199}, _, _}]
+                        expired := [
+                            {query, _, {inc_counter, 199}, _, _},
+                            {query, _, {inc_counter, 299}, _, _}
+                        ]
                     }
                 ],
                 ?of_kind(handle_async_reply_expired, Trace)
@@ -2096,12 +2174,16 @@ t_expiration_batch_all_expired_after_reply(_Config) ->
             ?assertMatch(
                 #{
                     counters := #{
-                        matched := 1,
+                        matched := 2,
                         success := 0,
                         dropped := 0,
-                        late_reply := 1,
+                        late_reply := 2,
                         retried := 0,
                         failed := 0
+                    },
+                    gauges := #{
+                        inflight := 0,
+                        queuing := 0
                     }
                 },
                 Metrics
@@ -2217,6 +2299,16 @@ do_t_expiration_retry(IsBatch) ->
                 [#{expired := [{query, _, {inc_counter, 1}, _, _}]}],
                 ?of_kind(buffer_worker_retry_expired, Trace)
             ),
+            Metrics = tap_metrics(?LINE),
+            ?assertMatch(
+                #{
+                    gauges := #{
+                        inflight := 0,
+                        queuing := 0
+                    }
+                },
+                Metrics
+            ),
             ok
         end
     ),

+ 1 - 0
changes/ce/fix-10020.en.md

@@ -0,0 +1 @@
+Fix bridge metrics when running in async mode with batching enabled (`batch_size` > 1).

+ 1 - 0
changes/ce/fix-10020.zh.md

@@ -0,0 +1 @@
+修复使用异步和批量配置的桥接计数不准确的问题。

+ 2 - 2
lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl

@@ -268,7 +268,7 @@ kafka_bridge_rest_api_helper(Config) ->
     CreateBodyTmp = #{
         <<"type">> => <<"kafka">>,
         <<"name">> => <<"my_kafka_bridge">>,
-        <<"bootstrap_hosts">> => maps:get(<<"bootstrap_hosts">>, Config),
+        <<"bootstrap_hosts">> => iolist_to_binary(maps:get(<<"bootstrap_hosts">>, Config)),
         <<"enable">> => true,
         <<"authentication">> => maps:get(<<"authentication">>, Config),
         <<"producer">> => #{
@@ -276,7 +276,7 @@ kafka_bridge_rest_api_helper(Config) ->
                 topic => <<"t/#">>
             },
             <<"kafka">> => #{
-                <<"topic">> => erlang:list_to_binary(KafkaTopic),
+                <<"topic">> => iolist_to_binary(KafkaTopic),
                 <<"buffer">> => #{
                     <<"memory_overload_protection">> => <<"false">>
                 },

+ 11 - 5
scripts/ct/run.sh

@@ -21,6 +21,12 @@ help() {
     echo "                        otherwise it runs the entire app's CT"
 }
 
+if command -v docker-compose; then
+    DC='docker-compose'
+else
+    DC='docker compose'
+fi
+
 WHICH_APP='novalue'
 CONSOLE='no'
 KEEP_UP='no'
@@ -155,7 +161,7 @@ for dep in ${CT_DEPS}; do
             ;;
         tdengine)
             FILES+=( '.ci/docker-compose-file/docker-compose-tdengine-restful.yaml' )
-            ;; 
+            ;;
         *)
             echo "unknown_ct_dependency $dep"
             exit 1
@@ -201,7 +207,7 @@ if [ "$STOP" = 'no' ]; then
     # some left-over log file has to be deleted before a new docker-compose up
     rm -f '.ci/docker-compose-file/redis/*.log'
     # shellcheck disable=2086 # no quotes for F_OPTIONS
-    docker compose $F_OPTIONS up -d --build --remove-orphans
+    $DC $F_OPTIONS up -d --build --remove-orphans
 fi
 
 echo "Fixing file owners and permissions for $UID_GID"
@@ -218,7 +224,7 @@ set +e
 
 if [ "$STOP" = 'yes' ]; then
     # shellcheck disable=2086 # no quotes for F_OPTIONS
-    docker compose $F_OPTIONS down --remove-orphans
+    $DC $F_OPTIONS down --remove-orphans
 elif [ "$ATTACH" = 'yes' ]; then
     docker exec -it "$ERLANG_CONTAINER" bash
 elif [ "$CONSOLE" = 'yes' ]; then
@@ -235,11 +241,11 @@ else
         LOG='_build/test/logs/docker-compose.log'
         echo "Dumping docker-compose log to $LOG"
         # shellcheck disable=2086 # no quotes for F_OPTIONS
-        docker compose $F_OPTIONS logs --no-color --timestamps > "$LOG"
+        $DC $F_OPTIONS logs --no-color --timestamps > "$LOG"
     fi
     if [ "$KEEP_UP" != 'yes' ]; then
         # shellcheck disable=2086 # no quotes for F_OPTIONS
-        docker compose $F_OPTIONS down
+        $DC $F_OPTIONS down
     fi
     exit $RESULT
 fi