Просмотр исходного кода

feat: add inflight window to emqx_resource

Shawn 3 лет назад
Родитель
Сommit
6203a01320

+ 1 - 1
apps/emqx_bridge/src/emqx_bridge.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_bridge, [
     {description, "An OTP application"},
-    {vsn, "0.1.1"},
+    {vsn, "0.1.2"},
     {registered, []},
     {mod, {emqx_bridge_app, []}},
     {applications, [

+ 2 - 1
apps/emqx_resource/include/emqx_resource.hrl

@@ -27,7 +27,8 @@
 -type reply_fun() :: {fun((result(), Args :: term()) -> any()), Args :: term()} | undefined.
 -type query_opts() :: #{
     %% The key used for picking a resource worker
-    pick_key => term()
+    pick_key => term(),
+    async_reply_fun => reply_fun()
 }.
 -type resource_data() :: #{
     id := resource_id(),

+ 1 - 0
apps/emqx_resource/src/emqx_resource.erl

@@ -76,6 +76,7 @@
     stop/1,
     %% query the instance
     query/2,
+    query/3,
     %% query the instance without batching and queuing messages.
     simple_sync_query/2,
     simple_async_query/3

+ 160 - 64
apps/emqx_resource/src/emqx_resource_worker.erl

@@ -50,7 +50,7 @@
 
 -export([queue_item_marshaller/1, estimate_size/1]).
 
--export([reply_after_query/4, batch_reply_after_query/4]).
+-export([reply_after_query/6, batch_reply_after_query/6]).
 
 -define(RESUME_INTERVAL, 15000).
 
@@ -69,18 +69,18 @@
     {error, {resource_error, #{reason => Reason, msg => iolist_to_binary(Msg)}}}
 ).
 -define(RESOURCE_ERROR_M(Reason, Msg), {error, {resource_error, #{reason := Reason, msg := Msg}}}).
+-define(DEFAULT_QUEUE_SIZE, 1024 * 1024 * 1024).
+-define(DEFAULT_INFLIGHT, 100).
 
 -type id() :: binary().
 -type query() :: {query, from(), request()}.
 -type request() :: term().
 -type from() :: pid() | reply_fun().
 
--export_type([query_opts/0]).
-
 -callback batcher_flush(Acc :: [{from(), request()}], CbState :: term()) ->
     {{from(), result()}, NewCbState :: term()}.
 
-callback_mode() -> [state_functions].
+callback_mode() -> [state_functions, state_enter].
 
 start_link(Id, Index, Opts) ->
     gen_statem:start_link({local, name(Id, Index)}, ?MODULE, {Id, Index, Opts}, []).
@@ -89,18 +89,18 @@ start_link(Id, Index, Opts) ->
 query(Id, Request, Opts) ->
     PickKey = maps:get(pick_key, Opts, self()),
     Timeout = maps:get(timeout, Opts, infinity),
-    pick_call(Id, PickKey, {query, Request}, Timeout).
+    pick_call(Id, PickKey, {query, Request, Opts}, Timeout).
 
 %% simple query the resource without batching and queuing messages.
 -spec simple_sync_query(id(), request()) -> Result :: term().
 simple_sync_query(Id, Request) ->
-    Result = call_query(sync, Id, ?QUERY(self(), Request), 1),
+    Result = call_query(sync, Id, ?QUERY(self(), Request), #{}),
     _ = handle_query_result(Id, Result, false),
     Result.
 
 -spec simple_async_query(id(), request(), reply_fun()) -> Result :: term().
 simple_async_query(Id, Request, ReplyFun) ->
-    Result = call_query(async, Id, ?QUERY(ReplyFun, Request), 1),
+    Result = call_query(async, Id, ?QUERY(ReplyFun, Request), #{}),
     _ = handle_query_result(Id, Result, false),
     Result.
 
@@ -119,38 +119,44 @@ resume(ServerRef) ->
 init({Id, Index, Opts}) ->
     process_flag(trap_exit, true),
     true = gproc_pool:connect_worker(Id, {Id, Index}),
+    Name = name(Id, Index),
     BatchSize = maps:get(batch_size, Opts, ?DEFAULT_BATCH_SIZE),
     Queue =
         case maps:get(queue_enabled, Opts, false) of
             true ->
                 replayq:open(#{
                     dir => disk_queue_dir(Id, Index),
-                    seg_bytes => 10000000,
+                    seg_bytes => maps:get(queue_max_bytes, Opts, ?DEFAULT_QUEUE_SIZE),
                     sizer => fun ?MODULE:estimate_size/1,
                     marshaller => fun ?MODULE:queue_item_marshaller/1
                 });
             false ->
                 undefined
         end,
+    ok = inflight_new(Name),
     St = #{
         id => Id,
         index => Index,
+        name => Name,
         %% query_mode = dynamic | sync | async
         %% TODO:
         %%  dynamic mode is async mode when things are going well, but becomes sync mode
         %%  if the resource worker is overloaded
         query_mode => maps:get(query_mode, Opts, sync),
-        async_reply_fun => maps:get(async_reply_fun, Opts, undefined),
+        async_inflight_window => maps:get(async_inflight_window, Opts, ?DEFAULT_INFLIGHT),
         batch_enabled => maps:get(batch_enabled, Opts, false),
         batch_size => BatchSize,
         batch_time => maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME),
         queue => Queue,
+        resume_interval => maps:get(resume_interval, Opts, ?RESUME_INTERVAL),
         acc => [],
         acc_left => BatchSize,
         tref => undefined
     },
     {ok, blocked, St, {next_event, cast, resume}}.
 
+running(enter, _, _St) ->
+    keep_state_and_data;
 running(cast, resume, _St) ->
     keep_state_and_data;
 running(cast, block, St) ->
@@ -158,8 +164,8 @@ running(cast, block, St) ->
 running(cast, {block, [?QUERY(_, _) | _] = Batch}, #{queue := Q} = St) when is_list(Batch) ->
     Q1 = maybe_append_queue(Q, [?Q_ITEM(Query) || Query <- Batch]),
     {next_state, block, St#{queue := Q1}};
-running({call, From0}, {query, Request}, #{query_mode := QM, async_reply_fun := ReplyFun} = St) ->
-    From = maybe_quick_return(QM, From0, ReplyFun),
+running({call, From0}, {query, Request, Opts}, #{query_mode := QM} = St) ->
+    From = maybe_quick_return(QM, From0, maps:get(async_reply_fun, Opts, undefined)),
     query_or_acc(From, Request, St);
 running(info, {flush, Ref}, St = #{tref := {_TRef, Ref}}) ->
     flush(St#{tref := undefined});
@@ -169,6 +175,8 @@ running(info, Info, _St) ->
     ?SLOG(error, #{msg => unexpected_msg, info => Info}),
     keep_state_and_data.
 
+blocked(enter, _, #{resume_interval := ResumeT} = _St) ->
+    {keep_state_and_data, {state_timeout, ResumeT, resume}};
 blocked(cast, block, _St) ->
     keep_state_and_data;
 blocked(cast, {block, [?QUERY(_, _) | _] = Batch}, #{queue := Q} = St) when is_list(Batch) ->
@@ -178,9 +186,11 @@ blocked(cast, resume, St) ->
     do_resume(St);
 blocked(state_timeout, resume, St) ->
     do_resume(St);
-blocked({call, From0}, {query, Request}, #{query_mode := QM, async_reply_fun := ReplyFun} = St) ->
-    From = maybe_quick_return(QM, From0, ReplyFun),
-    handle_blocked(From, Request, St).
+blocked({call, From0}, {query, Request, Opts}, #{id := Id, queue := Q, query_mode := QM} = St) ->
+    From = maybe_quick_return(QM, From0, maps:get(async_reply_fun, Opts, undefined)),
+    Error = ?RESOURCE_ERROR(blocked, "resource is blocked"),
+    _ = reply_caller(Id, ?REPLY(From, Request, Error)),
+    {keep_state, St#{queue := maybe_append_queue(Q, [?Q_ITEM(?QUERY(From, Request))])}}.
 
 terminate(_Reason, #{id := Id, index := Index}) ->
     gproc_pool:disconnect_worker(Id, {Id, Index}).
@@ -216,30 +226,44 @@ pick_call(Id, Key, Query, Timeout) ->
             ?RESOURCE_ERROR(timeout, "call resource timeout")
     end.
 
-do_resume(#{queue := undefined} = St) ->
+do_resume(#{queue := Q, id := Id, name := Name} = St) ->
+    case inflight_get_first(Name) of
+        empty ->
+            retry_first_from_queue(Q, Id, St);
+        {Ref, FirstQuery} ->
+            retry_first_sync(Id, FirstQuery, Name, Ref, undefined, St)
+    end.
+
+retry_first_from_queue(undefined, _Id, St) ->
     {next_state, running, St};
-do_resume(#{queue := Q, id := Id} = St) ->
+retry_first_from_queue(Q, Id, St) ->
     case replayq:peek(Q) of
         empty ->
             {next_state, running, St};
         ?Q_ITEM(FirstQuery) ->
-            Result = call_query(sync, Id, FirstQuery, 1),
-            case handle_query_result(Id, Result, false) of
-                %% Send failed because resource down
-                true ->
-                    {keep_state, St, {state_timeout, ?RESUME_INTERVAL, resume}};
-                %% Send ok or failed but the resource is working
-                false ->
-                    %% We Send 'resume' to the end of the mailbox to give the worker
-                    %% a chance to process 'query' requests.
-                    {keep_state, St#{queue => drop_head(Q)}, {state_timeout, 0, resume}}
-            end
+            retry_first_sync(Id, FirstQuery, undefined, undefined, Q, St)
     end.
 
-handle_blocked(From, Request, #{id := Id, queue := Q} = St) ->
-    Error = ?RESOURCE_ERROR(blocked, "resource is blocked"),
-    _ = reply_caller(Id, ?REPLY(From, Request, Error)),
-    {keep_state, St#{queue := maybe_append_queue(Q, [?Q_ITEM(?QUERY(From, Request))])}}.
+retry_first_sync(Id, FirstQuery, Name, Ref, Q, #{resume_interval := ResumeT} = St0) ->
+    Result = call_query(sync, Id, FirstQuery, #{}),
+    case handle_query_result(Id, Result, false) of
+        %% Send failed because resource down
+        true ->
+            {keep_state, St0, {state_timeout, ResumeT, resume}};
+        %% Send ok or failed but the resource is working
+        false ->
+            %% We Send 'resume' to the end of the mailbox to give the worker
+            %% a chance to process 'query' requests.
+            St =
+                case Q of
+                    undefined ->
+                        inflight_drop(Name, Ref),
+                        St0;
+                    _ ->
+                        St0#{queue => drop_head(Q)}
+                end,
+            {keep_state, St, {state_timeout, 0, resume}}
+    end.
 
 drop_head(Q) ->
     {Q1, AckRef, _} = replayq:pop(Q, #{count_limit => 1}),
@@ -254,7 +278,11 @@ query_or_acc(From, Request, #{batch_enabled := true, acc := Acc, acc_left := Lef
         false -> {keep_state, ensure_flush_timer(St)}
     end;
 query_or_acc(From, Request, #{batch_enabled := false, queue := Q, id := Id, query_mode := QM} = St) ->
-    case send_query(QM, From, Request, Id) of
+    QueryOpts = #{
+        inflight_name => maps:get(name, St),
+        inflight_window => maps:get(async_inflight_window, St)
+    },
+    case send_query(QM, From, Request, Id, QueryOpts) of
         true ->
             Query = ?QUERY(From, Request),
             {next_state, blocked, St#{queue := maybe_append_queue(Q, [?Q_ITEM(Query)])}};
@@ -262,8 +290,8 @@ query_or_acc(From, Request, #{batch_enabled := false, queue := Q, id := Id, quer
             {keep_state, St}
     end.
 
-send_query(QM, From, Request, Id) ->
-    Result = call_query(QM, Id, ?QUERY(From, Request), 1),
+send_query(QM, From, Request, Id, QueryOpts) ->
+    Result = call_query(QM, Id, ?QUERY(From, Request), QueryOpts),
     reply_caller(Id, ?REPLY(From, Request, Result)).
 
 flush(#{acc := []} = St) ->
@@ -277,7 +305,11 @@ flush(
         query_mode := QM
     } = St
 ) ->
-    Result = call_query(QM, Id, Batch, length(Batch)),
+    QueryOpts = #{
+        inflight_name => maps:get(name, St),
+        inflight_window => maps:get(async_inflight_window, St)
+    },
+    Result = call_query(QM, Id, Batch, QueryOpts),
     St1 = cancel_flush_timer(St#{acc_left := Size, acc := []}),
     case batch_reply_caller(Id, Result, Batch) of
         true ->
@@ -332,21 +364,21 @@ handle_query_result(Id, {error, _}, BlockWorker) ->
 handle_query_result(Id, {resource_down, _}, _BlockWorker) ->
     emqx_metrics_worker:inc(?RES_METRICS, Id, resource_down),
     true;
+handle_query_result(_Id, {async_return, inflight_full}, _BlockWorker) ->
+    true;
 handle_query_result(_Id, {async_return, {resource_down, _}}, _BlockWorker) ->
     true;
 handle_query_result(_Id, {async_return, ok}, BlockWorker) ->
     BlockWorker;
 handle_query_result(Id, Result, BlockWorker) ->
-    %% assert
-    true = is_ok_result(Result),
+    assert_ok_result(Result),
     emqx_metrics_worker:inc(?RES_METRICS, Id, success),
     BlockWorker.
 
-call_query(QM, Id, Query, QueryLen) ->
+call_query(QM, Id, Query, QueryOpts) ->
     case emqx_resource_manager:ets_lookup(Id) of
         {ok, _Group, #{callback_mode := CM, mod := Mod, state := ResSt, status := connected}} ->
-            ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched, QueryLen),
-            apply_query_fun(call_mode(QM, CM), Mod, Id, Query, ResSt);
+            apply_query_fun(call_mode(QM, CM), Mod, Id, Query, ResSt, QueryOpts);
         {ok, _Group, #{status := stopped}} ->
             ?RESOURCE_ERROR(stopped, "resource stopped or disabled");
         {ok, _Group, #{status := S}} when S == connecting; S == disconnected ->
@@ -372,58 +404,119 @@ call_query(QM, Id, Query, QueryLen) ->
     end
 ).
 
-apply_query_fun(sync, Mod, Id, ?QUERY(_, Request) = _Query, ResSt) ->
+apply_query_fun(sync, Mod, Id, ?QUERY(_, Request) = _Query, ResSt, _QueryOpts) ->
     ?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt}),
+    ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched),
     ?APPLY_RESOURCE(Mod:on_query(Id, Request, ResSt), Request);
-apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt) ->
+apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) ->
     ?tp(call_query_async, #{id => Id, mod => Mod, query => Query, res_st => ResSt}),
-    ReplyFun = fun ?MODULE:reply_after_query/4,
+    Name = maps:get(inflight_name, QueryOpts, undefined),
+    WinSize = maps:get(inflight_window, QueryOpts, undefined),
     ?APPLY_RESOURCE(
-        begin
-            Result = Mod:on_query_async(Id, Request, {ReplyFun, [self(), Id, Query]}, ResSt),
-            {async_return, Result}
+        case inflight_is_full(Name, WinSize) of
+            true ->
+                ?tp(inflight_full, #{id => Id, wind_size => WinSize}),
+                {async_return, inflight_full};
+            false ->
+                ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched),
+                ReplyFun = fun ?MODULE:reply_after_query/6,
+                Ref = make_message_ref(),
+                Args = [self(), Id, Name, Ref, Query],
+                ok = inflight_append(Name, Ref, Query),
+                Result = Mod:on_query_async(Id, Request, {ReplyFun, Args}, ResSt),
+                {async_return, Result}
         end,
         Request
     );
-apply_query_fun(sync, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt) ->
+apply_query_fun(sync, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, _QueryOpts) ->
     ?tp(call_batch_query, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}),
     Requests = [Request || ?QUERY(_From, Request) <- Batch],
+    ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched, length(Batch)),
     ?APPLY_RESOURCE(Mod:on_batch_query(Id, Requests, ResSt), Batch);
-apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt) ->
+apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) ->
     ?tp(call_batch_query_async, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}),
-    Requests = [Request || ?QUERY(_From, Request) <- Batch],
-    ReplyFun = fun ?MODULE:batch_reply_after_query/4,
+    Name = maps:get(inflight_name, QueryOpts, undefined),
+    WinSize = maps:get(inflight_window, QueryOpts, undefined),
     ?APPLY_RESOURCE(
-        begin
-            Result = Mod:on_batch_query_async(Id, Requests, {ReplyFun, [self(), Id, Batch]}, ResSt),
-            {async_return, Result}
+        case inflight_is_full(Name, WinSize) of
+            true ->
+                ?tp(inflight_full, #{id => Id, wind_size => WinSize}),
+                {async_return, inflight_full};
+            false ->
+                ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched, length(Batch)),
+                ReplyFun = fun ?MODULE:batch_reply_after_query/6,
+                Ref = make_message_ref(),
+                Args = {ReplyFun, [self(), Id, Name, Ref, Batch]},
+                Requests = [Request || ?QUERY(_From, Request) <- Batch],
+                ok = inflight_append(Name, Ref, Batch),
+                Result = Mod:on_batch_query_async(Id, Requests, Args, ResSt),
+                {async_return, Result}
         end,
         Batch
     ).
 
-reply_after_query(Pid, Id, ?QUERY(From, Request) = Query, Result) ->
+reply_after_query(Pid, Id, Name, Ref, ?QUERY(From, Request), Result) ->
     case reply_caller(Id, ?REPLY(From, Request, Result)) of
-        true -> ?MODULE:block(Pid, [Query]);
-        false -> ok
+        true -> ?MODULE:block(Pid);
+        false -> inflight_drop(Name, Ref)
     end.
 
-batch_reply_after_query(Pid, Id, Batch, Result) ->
+batch_reply_after_query(Pid, Id, Name, Ref, Batch, Result) ->
     case batch_reply_caller(Id, Result, Batch) of
-        true -> ?MODULE:block(Pid, Batch);
-        false -> ok
+        true -> ?MODULE:block(Pid);
+        false -> inflight_drop(Name, Ref)
+    end.
+%%==============================================================================
+%% the inflight queue for async query
+
+inflight_new(Name) ->
+    _ = ets:new(Name, [named_table, ordered_set, public, {write_concurrency, true}]),
+    ok.
+
+inflight_get_first(Name) ->
+    case ets:first(Name) of
+        '$end_of_table' ->
+            empty;
+        Ref ->
+            case ets:lookup(Name, Ref) of
+                [Object] -> Object;
+                [] -> inflight_get_first(Name)
+            end
+    end.
+
+inflight_is_full(undefined, _) ->
+    false;
+inflight_is_full(Name, MaxSize) ->
+    case ets:info(Name, size) of
+        Size when Size >= MaxSize -> true;
+        _ -> false
     end.
 
+inflight_append(undefined, _Ref, _Query) ->
+    ok;
+inflight_append(Name, Ref, Query) ->
+    ets:insert(Name, {Ref, Query}),
+    ok.
+
+inflight_drop(undefined, _) ->
+    ok;
+inflight_drop(Name, Ref) ->
+    ets:delete(Name, Ref),
+    ok.
+
 %%==============================================================================
 call_mode(sync, _) -> sync;
 call_mode(async, always_sync) -> sync;
 call_mode(async, async_if_possible) -> async.
 
-is_ok_result(ok) ->
+assert_ok_result(ok) ->
     true;
-is_ok_result(R) when is_tuple(R) ->
-    erlang:element(1, R) == ok;
-is_ok_result(_) ->
-    false.
+assert_ok_result({async_return, R}) ->
+    assert_ok_result(R);
+assert_ok_result(R) when is_tuple(R) ->
+    ok = erlang:element(1, R);
+assert_ok_result(R) ->
+    error({not_ok_result, R}).
 
 -spec name(id(), integer()) -> atom().
 name(Id, Index) ->
@@ -447,3 +540,6 @@ cancel_flush_timer(St = #{tref := undefined}) ->
 cancel_flush_timer(St = #{tref := {TRef, _Ref}}) ->
     _ = erlang:cancel_timer(TRef),
     St#{tref => undefined}.
+
+make_message_ref() ->
+    erlang:unique_integer([monotonic, positive]).

+ 46 - 10
apps/emqx_resource/test/emqx_connector_demo.erl

@@ -31,7 +31,7 @@
     on_get_status/2
 ]).
 
--export([counter_loop/1, set_callback_mode/1]).
+-export([counter_loop/0, set_callback_mode/1]).
 
 %% callbacks for emqx_resource config schema
 -export([roots/0]).
@@ -84,9 +84,22 @@ on_query(_InstId, get_state, State) ->
     {ok, State};
 on_query(_InstId, get_state_failed, State) ->
     {error, State};
-on_query(_InstId, {inc_counter, N}, #{pid := Pid}) ->
-    Pid ! {inc, N},
+on_query(_InstId, block, #{pid := Pid}) ->
+    Pid ! block,
+    ok;
+on_query(_InstId, resume, #{pid := Pid}) ->
+    Pid ! resume,
     ok;
+on_query(_InstId, {inc_counter, N}, #{pid := Pid}) ->
+    ReqRef = make_ref(),
+    From = {self(), ReqRef},
+    Pid ! {From, {inc, N}},
+    receive
+        {ReqRef, ok} -> ok;
+        {ReqRef, incorrect_status} -> {resource_down, incorrect_status}
+    after 1000 ->
+        {error, timeout}
+    end;
 on_query(_InstId, get_counter, #{pid := Pid}) ->
     ReqRef = make_ref(),
     From = {self(), ReqRef},
@@ -97,9 +110,12 @@ on_query(_InstId, get_counter, #{pid := Pid}) ->
         {error, timeout}
     end.
 
-on_query_async(_InstId, Query, ReplyFun, State) ->
-    Result = on_query(_InstId, Query, State),
-    apply_reply(ReplyFun, Result).
+on_query_async(_InstId, {inc_counter, N}, ReplyFun, #{pid := Pid}) ->
+    Pid ! {inc, N, ReplyFun},
+    ok;
+on_query_async(_InstId, get_counter, ReplyFun, #{pid := Pid}) ->
+    Pid ! {get, ReplyFun},
+    ok.
 
 on_batch_query(InstId, BatchReq, State) ->
     %% Requests can be either 'get_counter' or 'inc_counter', but cannot be mixed.
@@ -136,15 +152,35 @@ on_get_status(_InstId, #{pid := Pid}) ->
     end.
 
 spawn_counter_process(Name, Register) ->
-    Pid = spawn_link(?MODULE, counter_loop, [#{counter => 0}]),
+    Pid = spawn_link(?MODULE, counter_loop, []),
     true = maybe_register(Name, Pid, Register),
     Pid.
 
-counter_loop(#{counter := Num} = State) ->
+counter_loop() ->
+    counter_loop(#{counter => 0, status => running}).
+
+counter_loop(#{counter := Num, status := Status} = State) ->
     NewState =
         receive
-            {inc, N} ->
-                #{counter => Num + N};
+            block ->
+                ct:pal("counter recv: ~p", [block]),
+                State#{status => blocked};
+            resume ->
+                {messages, Msgs} = erlang:process_info(self(), messages),
+                ct:pal("counter recv: ~p, buffered msgs: ~p", [resume, length(Msgs)]),
+                State#{status => running};
+            {inc, N, ReplyFun} when Status == running ->
+                apply_reply(ReplyFun, ok),
+                State#{counter => Num + N};
+            {{FromPid, ReqRef}, {inc, N}} when Status == running ->
+                FromPid ! {ReqRef, ok},
+                State#{counter => Num + N};
+            {{FromPid, ReqRef}, {inc, _N}} when Status == blocked ->
+                FromPid ! {ReqRef, incorrect_status},
+                State;
+            {get, ReplyFun} ->
+                apply_reply(ReplyFun, Num),
+                State;
             {{FromPid, ReqRef}, get} ->
                 FromPid ! {ReqRef, Num},
                 State

+ 131 - 5
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -236,7 +236,7 @@ t_batch_query_counter(_) ->
 
     ok = emqx_resource:remove_local(?ID).
 
-t_query_counter_async(_) ->
+t_query_counter_async_query(_) ->
     {ok, _} = emqx_resource:create_local(
         ?ID,
         ?DEFAULT_RESOURCE_GROUP,
@@ -271,24 +271,25 @@ t_query_counter_async(_) ->
     ?assertMatch(#{matched := 1002, success := 1002, failed := 0}, C),
     ok = emqx_resource:remove_local(?ID).
 
-t_query_counter_async_2(_) ->
+t_query_counter_async_callback(_) ->
     emqx_connector_demo:set_callback_mode(async_if_possible),
 
     Tab0 = ets:new(?FUNCTION_NAME, [bag, public]),
     Insert = fun(Tab, Result) ->
         ets:insert(Tab, {make_ref(), Result})
     end,
+    ReqOpts = #{async_reply_fun => {Insert, [Tab0]}},
     {ok, _} = emqx_resource:create_local(
         ?ID,
         ?DEFAULT_RESOURCE_GROUP,
         ?TEST_RESOURCE,
         #{name => test_resource, register => true},
-        #{query_mode => async, async_reply_fun => {Insert, [Tab0]}}
+        #{query_mode => async, async_inflight_window => 1000000}
     ),
     ?assertMatch({ok, 0}, emqx_resource:simple_sync_query(?ID, get_counter)),
     ?check_trace(
         ?TRACE_OPTS,
-        inc_counter_in_parallel(1000),
+        inc_counter_in_parallel(1000, ReqOpts),
         fun(Trace) ->
             QueryTrace = ?of_kind(call_query_async, Trace),
             ?assertMatch([#{query := {query, _, {inc_counter, 1}}} | _], QueryTrace)
@@ -321,6 +322,117 @@ t_query_counter_async_2(_) ->
     ),
     ok = emqx_resource:remove_local(?ID).
 
+t_query_counter_async_inflight(_) ->
+    emqx_connector_demo:set_callback_mode(async_if_possible),
+
+    Tab0 = ets:new(?FUNCTION_NAME, [bag, public]),
+    Insert0 = fun(Tab, Result) ->
+        ets:insert(Tab, {make_ref(), Result})
+    end,
+    ReqOpts = #{async_reply_fun => {Insert0, [Tab0]}},
+    WindowSize = 15,
+    {ok, _} = emqx_resource:create_local(
+        ?ID,
+        ?DEFAULT_RESOURCE_GROUP,
+        ?TEST_RESOURCE,
+        #{name => test_resource, register => true},
+        #{
+            query_mode => async,
+            async_inflight_window => WindowSize,
+            worker_pool_size => 1,
+            resume_interval => 300
+        }
+    ),
+    ?assertMatch({ok, 0}, emqx_resource:simple_sync_query(?ID, get_counter)),
+
+    %% block the resource
+    ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)),
+
+    %% send async query to make the inflight window full
+    ?check_trace(
+        ?TRACE_OPTS,
+        inc_counter_in_parallel(WindowSize, ReqOpts),
+        fun(Trace) ->
+            QueryTrace = ?of_kind(call_query_async, Trace),
+            ?assertMatch([#{query := {query, _, {inc_counter, 1}}} | _], QueryTrace)
+        end
+    ),
+
+    %% this will block the resource_worker
+    ok = emqx_resource:query(?ID, {inc_counter, 1}),
+    ?assertMatch(0, ets:info(Tab0, size)),
+    %% sleep to make the resource_worker resume some times
+    timer:sleep(2000),
+
+    %% send query now will fail because the resource is blocked.
+    Insert = fun(Tab, Ref, Result) ->
+        ets:insert(Tab, {Ref, Result})
+    end,
+    ok = emqx_resource:query(?ID, {inc_counter, 1}, #{
+        async_reply_fun => {Insert, [Tab0, tmp_query]}
+    }),
+    ?assertMatch([{_, {error, {resource_error, #{reason := blocked}}}}], ets:take(Tab0, tmp_query)),
+
+    %% all response should be received after the resource is resumed.
+    ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)),
+    timer:sleep(1000),
+    ?assertEqual(WindowSize, ets:info(Tab0, size)),
+
+    %% send async query, this time everything should be ok.
+    Num = 10,
+    ?check_trace(
+        ?TRACE_OPTS,
+        inc_counter_in_parallel(Num, ReqOpts),
+        fun(Trace) ->
+            QueryTrace = ?of_kind(call_query_async, Trace),
+            ?assertMatch([#{query := {query, _, {inc_counter, 1}}} | _], QueryTrace)
+        end
+    ),
+    timer:sleep(1000),
+    ?assertEqual(WindowSize + Num, ets:info(Tab0, size)),
+
+    %% block the resource
+    ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)),
+    %% again, send async query to make the inflight window full
+    ?check_trace(
+        ?TRACE_OPTS,
+        inc_counter_in_parallel(WindowSize, ReqOpts),
+        fun(Trace) ->
+            QueryTrace = ?of_kind(call_query_async, Trace),
+            ?assertMatch([#{query := {query, _, {inc_counter, 1}}} | _], QueryTrace)
+        end
+    ),
+
+    %% this will block the resource_worker
+    ok = emqx_resource:query(?ID, {inc_counter, 1}),
+
+    Sent = WindowSize + Num + WindowSize,
+    ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)),
+    timer:sleep(1000),
+    ?assertEqual(Sent, ets:info(Tab0, size)),
+
+    {ok, Counter} = emqx_resource:simple_sync_query(?ID, get_counter),
+    ct:pal("get_counter: ~p, sent: ~p", [Counter, Sent]),
+    ?assert(Sent == Counter),
+
+    {ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID),
+    ct:pal("metrics: ~p", [C]),
+    ?assertMatch(
+        #{matched := M, success := S, exception := E, failed := F, resource_down := RD} when
+            M >= Sent andalso M == S + E + F + RD,
+        C
+    ),
+    ?assert(
+        lists:all(
+            fun
+                ({_, ok}) -> true;
+                (_) -> false
+            end,
+            ets:tab2list(Tab0)
+        )
+    ),
+    ok = emqx_resource:remove_local(?ID).
+
 t_healthy_timeout(_) ->
     {ok, _} = emqx_resource:create_local(
         ?ID,
@@ -550,10 +662,13 @@ t_auto_retry(_) ->
 %% Helpers
 %%------------------------------------------------------------------------------
 inc_counter_in_parallel(N) ->
+    inc_counter_in_parallel(N, #{}).
+
+inc_counter_in_parallel(N, Opts) ->
     Parent = self(),
     Pids = [
         erlang:spawn(fun() ->
-            emqx_resource:query(?ID, {inc_counter, 1}),
+            emqx_resource:query(?ID, {inc_counter, 1}, Opts),
             Parent ! {complete, self()}
         end)
      || _ <- lists:seq(1, N)
@@ -567,6 +682,17 @@ inc_counter_in_parallel(N) ->
      || Pid <- Pids
     ].
 
+% verify_inflight_full(WindowSize) ->
+%     ?check_trace(
+%         ?TRACE_OPTS,
+%         emqx_resource:query(?ID, {inc_counter, 1}),
+%         fun(Return, Trace) ->
+%             QueryTrace = ?of_kind(inflight_full, Trace),
+%             ?assertMatch([#{wind_size := WindowSize} | _], QueryTrace),
+%             ?assertMatch(ok, Return)
+%         end
+%     ).
+
 bin_config() ->
     <<"\"name\": \"test_resource\"">>.