Browse Source

Merge pull request #9922 from keynslug/fix/EMQX-8876/inflight-stuck-full

fix(bufworker): do not avoid retry if inflight table is full
Zaiming (Stone) Shi 3 years ago
parent
commit
73d5592b5a

+ 1 - 1
apps/emqx_resource/src/emqx_resource.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_resource, [
     {description, "Manager for all external resources"},
-    {vsn, "0.1.6"},
+    {vsn, "0.1.7"},
     {registered, []},
     {mod, {emqx_resource_app, []}},
     {applications, [

+ 2 - 12
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -334,12 +334,7 @@ resume_from_blocked(Data) ->
         {single, Ref, Query} ->
             %% We retry msgs in inflight window sync, as if we send them
             %% async, they will be appended to the end of inflight window again.
-            case is_inflight_full(InflightTID) of
-                true ->
-                    {keep_state, Data};
-                false ->
-                    retry_inflight_sync(Ref, Query, Data)
-            end;
+            retry_inflight_sync(Ref, Query, Data);
         {batch, Ref, NotExpired, Expired} ->
             update_inflight_item(InflightTID, Ref, NotExpired),
             NumExpired = length(Expired),
@@ -347,12 +342,7 @@ resume_from_blocked(Data) ->
             NumExpired > 0 andalso ?tp(buffer_worker_retry_expired, #{expired => Expired}),
             %% We retry msgs in inflight window sync, as if we send them
             %% async, they will be appended to the end of inflight window again.
-            case is_inflight_full(InflightTID) of
-                true ->
-                    {keep_state, Data};
-                false ->
-                    retry_inflight_sync(Ref, NotExpired, Data)
-            end
+            retry_inflight_sync(Ref, NotExpired, Data)
     end.
 
 retry_inflight_sync(Ref, QueryOrBatch, Data0) ->

+ 33 - 0
apps/emqx_resource/test/emqx_connector_demo.erl

@@ -134,6 +134,17 @@ on_query(_InstId, get_counter, #{pid := Pid}) ->
         {ReqRef, Num} -> {ok, Num}
     after 1000 ->
         {error, timeout}
+    end;
+on_query(_InstId, {sleep, For}, #{pid := Pid}) ->
+    ?tp(connector_demo_sleep, #{mode => sync, for => For}),
+    ReqRef = make_ref(),
+    From = {self(), ReqRef},
+    Pid ! {From, {sleep, For}},
+    receive
+        {ReqRef, Result} ->
+            Result
+    after 1000 ->
+        {error, timeout}
     end.
 
 on_query_async(_InstId, {inc_counter, N}, ReplyFun, #{pid := Pid}) ->
@@ -147,6 +158,10 @@ on_query_async(_InstId, block_now, ReplyFun, #{pid := Pid}) ->
     {ok, Pid};
 on_query_async(_InstId, {big_payload, Payload}, ReplyFun, #{pid := Pid}) ->
     Pid ! {big_payload, Payload, ReplyFun},
+    {ok, Pid};
+on_query_async(_InstId, {sleep, For}, ReplyFun, #{pid := Pid}) ->
+    ?tp(connector_demo_sleep, #{mode => async, for => For}),
+    Pid ! {{sleep, For}, ReplyFun},
     {ok, Pid}.
 
 on_batch_query(InstId, BatchReq, State) ->
@@ -283,10 +298,28 @@ counter_loop(
                 State;
             {{FromPid, ReqRef}, get} ->
                 FromPid ! {ReqRef, Num},
+                State;
+            {{sleep, _} = SleepQ, ReplyFun} ->
+                apply_reply(ReplyFun, handle_query(async, SleepQ, Status)),
+                State;
+            {{FromPid, ReqRef}, {sleep, _} = SleepQ} ->
+                FromPid ! {ReqRef, handle_query(sync, SleepQ, Status)},
                 State
         end,
     counter_loop(NewState).
 
+handle_query(Mode, {sleep, For} = Query, Status) ->
+    ok = timer:sleep(For),
+    Result =
+        case Status of
+            running -> ok;
+            blocked -> {error, {recoverable_error, blocked}}
+        end,
+    ?tp(connector_demo_sleep_handled, #{
+        mode => Mode, query => Query, slept => For, result => Result
+    }),
+    Result.
+
 maybe_register(Name, Pid, true) ->
     ct:pal("---- Register Name: ~p", [Name]),
     ct:pal("---- whereis(): ~p", [whereis(Name)]),

+ 70 - 14
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -1452,6 +1452,61 @@ t_retry_async_inflight(_Config) ->
     ),
     ok.
 
+t_retry_async_inflight_full(_Config) ->
+    ResumeInterval = 1_000,
+    AsyncInflightWindow = 5,
+    emqx_connector_demo:set_callback_mode(async_if_possible),
+    {ok, _} = emqx_resource:create(
+        ?ID,
+        ?DEFAULT_RESOURCE_GROUP,
+        ?TEST_RESOURCE,
+        #{name => ?FUNCTION_NAME},
+        #{
+            query_mode => async,
+            async_inflight_window => AsyncInflightWindow,
+            batch_size => 1,
+            batch_time => 20,
+            worker_pool_size => 1,
+            resume_interval => ResumeInterval
+        }
+    ),
+    ?check_trace(
+        #{timetrap => 15_000},
+        begin
+            %% block
+            ok = emqx_resource:simple_sync_query(?ID, block),
+
+            {ok, {ok, _}} =
+                ?wait_async_action(
+                    inc_counter_in_parallel(
+                        AsyncInflightWindow * 2,
+                        fun() ->
+                            For = (ResumeInterval div 4) + rand:uniform(ResumeInterval div 4),
+                            {sleep, For}
+                        end,
+                        #{async_reply_fun => {fun(Res) -> ct:pal("Res = ~p", [Res]) end, []}}
+                    ),
+                    #{?snk_kind := buffer_worker_flush_but_inflight_full},
+                    ResumeInterval * 2
+                ),
+
+            %% will reply with success after the resource is healed
+            {ok, {ok, _}} =
+                ?wait_async_action(
+                    emqx_resource:simple_sync_query(?ID, resume),
+                    #{?snk_kind := buffer_worker_enter_running}
+                ),
+            ok
+        end,
+        [
+            fun(Trace) ->
+                ?assertMatch([#{} | _], ?of_kind(buffer_worker_flush_but_inflight_full, Trace))
+            end
+        ]
+    ),
+    ?assertEqual(0, emqx_resource_metrics:inflight_get(?ID)),
+    ok.
+
 t_retry_async_inflight_batch(_Config) ->
     ResumeInterval = 1_000,
     emqx_connector_demo:set_callback_mode(async_if_possible),
@@ -2241,18 +2296,16 @@ t_expiration_retry_batch_multiple_times(_Config) ->
 %%------------------------------------------------------------------------------
 
 inc_counter_in_parallel(N) ->
-    inc_counter_in_parallel(N, #{}).
+    inc_counter_in_parallel(N, {inc_counter, 1}, #{}).
 
 inc_counter_in_parallel(N, Opts0) ->
+    inc_counter_in_parallel(N, {inc_counter, 1}, Opts0).
+
+inc_counter_in_parallel(N, Query, Opts) ->
     Parent = self(),
     Pids = [
         erlang:spawn(fun() ->
-            Opts =
-                case is_function(Opts0) of
-                    true -> Opts0();
-                    false -> Opts0
-                end,
-            emqx_resource:query(?ID, {inc_counter, 1}, Opts),
+            emqx_resource:query(?ID, maybe_apply(Query), maybe_apply(Opts)),
             Parent ! {complete, self()}
         end)
      || _ <- lists:seq(1, N)
@@ -2267,16 +2320,11 @@ inc_counter_in_parallel(N, Opts0) ->
     ],
     ok.
 
-inc_counter_in_parallel_increasing(N, StartN, Opts0) ->
+inc_counter_in_parallel_increasing(N, StartN, Opts) ->
     Parent = self(),
     Pids = [
         erlang:spawn(fun() ->
-            Opts =
-                case is_function(Opts0) of
-                    true -> Opts0();
-                    false -> Opts0
-                end,
-            emqx_resource:query(?ID, {inc_counter, M}, Opts),
+            emqx_resource:query(?ID, {inc_counter, M}, maybe_apply(Opts)),
             Parent ! {complete, self()}
         end)
      || M <- lists:seq(StartN, StartN + N - 1)
@@ -2290,6 +2338,14 @@ inc_counter_in_parallel_increasing(N, StartN, Opts0) ->
      || Pid <- Pids
     ].
 
+maybe_apply(FunOrTerm) ->
+    maybe_apply(FunOrTerm, []).
+
+maybe_apply(Fun, Args) when is_function(Fun) ->
+    erlang:apply(Fun, Args);
+maybe_apply(Term, _Args) ->
+    Term.
+
 bin_config() ->
     <<"\"name\": \"test_resource\"">>.
 

+ 1 - 0
changes/v5.0.17/fix-9922.en.md

@@ -0,0 +1 @@
+Fix the issue with the bridge resource buffer where it might become stuck if enough async queries fill the inflight window full before failing with retryable errors.

+ 1 - 0
changes/v5.0.17/fix-9922.zh.md

@@ -0,0 +1 @@
+修复桥接资源缓冲区的问题,如果足够多的异步查询在失败并出现可重试错误之前将机上窗口填满,则可能会卡住。