Explorar o código

fix: resume the resource worker on health check success

Shawn %!s(int64=3) %!d(string=hai) anos
pai
achega
c4106c0d77

+ 7 - 0
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -555,6 +555,7 @@ with_health_check(Data, Func) ->
     HCRes = emqx_resource:call_health_check(Data#data.manager_id, Data#data.mod, Data#data.state),
     {Status, NewState, Err} = parse_health_check_result(HCRes, Data),
     _ = maybe_alarm(Status, ResId),
+    ok = maybe_resume_resource_workers(Status),
     UpdatedData = Data#data{
         state = NewState, status = Status, error = Err
     },
@@ -575,6 +576,12 @@ maybe_alarm(_Status, ResId) ->
         <<"resource down: ", ResId/binary>>
     ).
 
+maybe_resume_resource_workers(connected) ->
+    {_, Pid, _, _} = supervisor:which_children(emqx_resource_worker_sup),
+    emqx_resource_worker:resume(Pid);
+maybe_resume_resource_workers(_) ->
+    ok.
+
 maybe_clear_alarm(<<?TEST_ID_PREFIX, _/binary>>) ->
     ok;
 maybe_clear_alarm(ResId) ->

+ 3 - 3
apps/emqx_resource/src/emqx_resource_worker_sup.erl

@@ -107,7 +107,7 @@ ensure_worker_started(ResId, Idx, Opts) ->
         type => worker,
         modules => [Mod]
     },
-    case supervisor:start_child(emqx_resource_sup, Spec) of
+    case supervisor:start_child(?SERVER, Spec) of
         {ok, _Pid} -> ok;
         {error, {already_started, _}} -> ok;
         {error, already_present} -> ok;
@@ -116,9 +116,9 @@ ensure_worker_started(ResId, Idx, Opts) ->
 
 ensure_worker_removed(ResId, Idx) ->
     ChildId = ?CHILD_ID(emqx_resource_worker, ResId, Idx),
-    case supervisor:terminate_child(emqx_resource_sup, ChildId) of
+    case supervisor:terminate_child(?SERVER, ChildId) of
         ok ->
-            Res = supervisor:delete_child(emqx_resource_sup, ChildId),
+            Res = supervisor:delete_child(?SERVER, ChildId),
             _ = gproc_pool:remove_worker(ResId, {ResId, Idx}),
             Res;
         {error, not_found} ->