Jelajahi Sumber

Merge pull request #9884 from savonarola/resource-fixes

fix(resources): fix resource lifecycle
Zaiming (Stone) Shi 3 tahun lalu
induk
melakukan
13ef30c46c

+ 1 - 1
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -195,7 +195,7 @@ init({Id, Index, Opts}) ->
     {ok, running, Data}.
 
 running(enter, _, Data) ->
-    ?tp(buffer_worker_enter_running, #{}),
+    ?tp(buffer_worker_enter_running, #{id => maps:get(id, Data)}),
     %% According to `gen_statem' laws, we mustn't call `maybe_flush'
     %% directly because it may decide to return `{next_state, blocked, _}',
     %% and that's an invalid response for a state enter call.

+ 9 - 1
apps/emqx_resource/src/emqx_resource_buffer_worker_sup.erl

@@ -23,7 +23,7 @@
 %% External API
 -export([start_link/0]).
 
--export([start_workers/2, stop_workers/2]).
+-export([start_workers/2, stop_workers/2, worker_pids/1]).
 
 %% Callbacks
 -export([init/1]).
@@ -75,6 +75,14 @@ stop_workers(ResId, Opts) ->
     ensure_worker_pool_removed(ResId),
     ok.
 
+worker_pids(ResId) ->
+    lists:map(
+        fun({_Name, Pid}) ->
+            Pid
+        end,
+        gproc_pool:active_workers(ResId)
+    ).
+
 %%%=============================================================================
 %%% Internal
 %%%=============================================================================

+ 7 - 7
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -555,12 +555,14 @@ handle_connected_health_check(Data) ->
         end
     ).
 
+with_health_check(#data{state = undefined} = Data, Func) ->
+    Func(disconnected, Data);
 with_health_check(Data, Func) ->
     ResId = Data#data.id,
     HCRes = emqx_resource:call_health_check(Data#data.manager_id, Data#data.mod, Data#data.state),
     {Status, NewState, Err} = parse_health_check_result(HCRes, Data),
     _ = maybe_alarm(Status, ResId),
-    ok = maybe_resume_resource_workers(Status),
+    ok = maybe_resume_resource_workers(ResId, Status),
     UpdatedData = Data#data{
         state = NewState, status = Status, error = Err
     },
@@ -581,14 +583,12 @@ maybe_alarm(_Status, ResId) ->
         <<"resource down: ", ResId/binary>>
     ).
 
-maybe_resume_resource_workers(connected) ->
+maybe_resume_resource_workers(ResId, connected) ->
     lists:foreach(
-        fun({_, Pid, _, _}) ->
-            emqx_resource_buffer_worker:resume(Pid)
-        end,
-        supervisor:which_children(emqx_resource_buffer_worker_sup)
+        fun emqx_resource_buffer_worker:resume/1,
+        emqx_resource_buffer_worker_sup:worker_pids(ResId)
     );
-maybe_resume_resource_workers(_) ->
+maybe_resume_resource_workers(_, _) ->
     ok.
 
 maybe_clear_alarm(<<?TEST_ID_PREFIX, _/binary>>) ->

+ 58 - 0
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -24,6 +24,7 @@
 
 -define(TEST_RESOURCE, emqx_connector_demo).
 -define(ID, <<"id">>).
+-define(ID1, <<"id1">>).
 -define(DEFAULT_RESOURCE_GROUP, <<"default">>).
 -define(RESOURCE_ERROR(REASON), {error, {resource_error, #{reason := REASON}}}).
 -define(TRACE_OPTS, #{timetrap => 10000, timeout => 1000}).
@@ -1029,6 +1030,63 @@ t_auto_retry(_) ->
     ),
     ?assertEqual(ok, Res).
 
+t_health_check_disconnected(_) ->
+    _ = emqx_resource:create_local(
+        ?ID,
+        ?DEFAULT_RESOURCE_GROUP,
+        ?TEST_RESOURCE,
+        #{name => test_resource, create_error => true},
+        #{auto_retry_interval => 100}
+    ),
+    ?assertEqual(
+        {ok, disconnected},
+        emqx_resource:health_check(?ID)
+    ).
+
+t_unblock_only_required_buffer_workers(_) ->
+    {ok, _} = emqx_resource:create(
+        ?ID,
+        ?DEFAULT_RESOURCE_GROUP,
+        ?TEST_RESOURCE,
+        #{name => test_resource},
+        #{
+            query_mode => async,
+            batch_size => 5
+        }
+    ),
+    lists:foreach(
+        fun emqx_resource_buffer_worker:block/1,
+        emqx_resource_buffer_worker_sup:worker_pids(?ID)
+    ),
+    emqx_resource:create(
+        ?ID1,
+        ?DEFAULT_RESOURCE_GROUP,
+        ?TEST_RESOURCE,
+        #{name => test_resource},
+        #{
+            query_mode => async,
+            batch_size => 5
+        }
+    ),
+    %% creation of `?ID1` should not have unblocked `?ID`'s buffer workers
+    %% so we should see resumes now (`buffer_worker_enter_running`).
+    ?check_trace(
+        ?wait_async_action(
+            lists:foreach(
+                fun emqx_resource_buffer_worker:resume/1,
+                emqx_resource_buffer_worker_sup:worker_pids(?ID)
+            ),
+            #{?snk_kind := buffer_worker_enter_running},
+            5000
+        ),
+        fun(Trace) ->
+            ?assertMatch(
+                [#{id := ?ID} | _],
+                ?of_kind(buffer_worker_enter_running, Trace)
+            )
+        end
+    ).
+
 t_retry_batch(_Config) ->
     {ok, _} = emqx_resource:create(
         ?ID,

+ 2 - 0
changes/v5.0.16/fix-9884.en.md

@@ -0,0 +1,2 @@
+Do not resume all buffer workers on successful health check of any individual resource.
+Previously after any successful healthcheck, all buffer workers (for all resources) were resumed

+ 2 - 0
changes/v5.0.16/fix-9884.zh.md

@@ -0,0 +1,2 @@
+不在任意一个资源健康检查成功时恢复所有资源发送缓存。
+在此修复之前,在任意一个资源成功进行健康检查后,所有资源的缓存都会尝试恢复。