Ver código fonte

Merge pull request #6793 from EMQ-YangM/health_check_timeout

feat(emqx_resource_health_check): add timeout to single health_check
Yang Miao 4 anos atrás
pai
commit
a3e3f2216c

+ 4 - 0
apps/emqx_resource/src/emqx_resource.erl

@@ -60,6 +60,7 @@
 -export([ restart/1  %% restart the instance.
         , restart/2
         , health_check/1 %% verify if the resource is working normally
+        , set_resource_status_stoped/1 %% set resource status to stoped
         , stop/1   %% stop the instance
         , query/2  %% query the instance
         , query/3  %% query the instance with after_query()
@@ -231,6 +232,9 @@ stop(InstId) ->
 health_check(InstId) ->
     call_instance(InstId, {health_check, InstId}).
 
+set_resource_status_stoped(InstId) ->
+    call_instance(InstId, {set_resource_status_stoped, InstId}).
+
 -spec get_instance(instance_id()) -> {ok, resource_data()} | {error, Reason :: term()}.
 get_instance(InstId) ->
     emqx_resource_instance:lookup(InstId).

+ 36 - 11
apps/emqx_resource/src/emqx_resource_health_check.erl

@@ -20,7 +20,9 @@
         , delete_checker/1
         ]).
 
--export([health_check/2]).
+-export([ start_health_check/2
+        , health_check_timeout_checker/3
+        ]).
 
 -define(SUP, emqx_resource_health_check_sup).
 -define(ID(NAME), {resource_health_check, NAME}).
@@ -32,7 +34,7 @@ child_spec(Name, Sleep) ->
       shutdown => 5000, type => worker, modules => [?MODULE]}.
 
 start_link(Name, Sleep) ->
-    Pid = proc_lib:spawn_link(?MODULE, health_check, [Name, Sleep]),
+    Pid = proc_lib:spawn_link(?MODULE, start_health_check, [Name, Sleep]),
     {ok, Pid}.
 
 create_checker(Name, Sleep) ->
@@ -54,13 +56,36 @@ delete_checker(Name) ->
         Error -> Error
 	end.
 
-health_check(Name, SleepTime) ->
-    case emqx_resource:health_check(Name) of
-        ok ->
-            emqx_alarm:deactivate(Name);
-        {error, _} ->
-            emqx_alarm:activate(Name, #{name => Name},
-                <<Name/binary, " health check failed">>)
+start_health_check(Name, Sleep) ->
+    Pid = self(),
+    _ = proc_lib:spawn_link(?MODULE, health_check_timeout_checker, [Pid, Name, Sleep]),
+    health_check(Name).
+
+health_check(Name) ->
+    receive
+        {Pid, begin_health_check}  ->
+            case emqx_resource:health_check(Name) of
+                ok ->
+                    emqx_alarm:deactivate(Name);
+                {error, _} ->
+                    emqx_alarm:activate(Name, #{name => Name},
+                        <<Name/binary, " health check failed">>)
+            end,
+            Pid ! health_check_finish
+    end,
+    health_check(Name).
+
+health_check_timeout_checker(Pid, Name, SleepTime) ->
+    SelfPid = self(),
+    Pid ! {SelfPid, begin_health_check},
+    receive
+        health_check_finish -> timer:sleep(SleepTime)
+    after 10000 ->
+        emqx_alarm:activate(Name, #{name => Name},
+                        <<Name/binary, " health check timout">>),
+        emqx_resource:set_resource_status_stoped(Name),
+        receive
+            health_check_finish -> timer:sleep(SleepTime)
+        end
     end,
-    timer:sleep(SleepTime),
-    health_check(Name, SleepTime).
+    health_check_timeout_checker(Pid, Name, SleepTime).

+ 11 - 0
apps/emqx_resource/src/emqx_resource_instance.erl

@@ -120,6 +120,9 @@ handle_call({stop, InstId}, _From, State) ->
 handle_call({health_check, InstId}, _From, State) ->
     {reply, do_health_check(InstId), State};
 
+handle_call({set_resource_status_stoped, InstId}, _From, State) ->
+    {reply, do_set_resource_status_stoped(InstId), State};
+
 handle_call(Req, _From, State) ->
     logger:error("Received unexpected call: ~p", [Req]),
     {reply, ignored, State}.
@@ -280,6 +283,14 @@ do_health_check(#{id := InstId, mod := Mod, state := ResourceState0} = Data) ->
             {error, Reason}
     end.
 
+do_set_resource_status_stoped(InstId) ->
+    case emqx_resource_instance:lookup(InstId) of
+        {ok, #{id := InstId} = Data} ->
+            logger:error("health check for ~p failed: timeout", [InstId]),
+            ets:insert(emqx_resource_instance, {InstId, Data#{status => stopped}});
+        Error -> {error, Error}
+    end.
+
 %%------------------------------------------------------------------------------
 %% internal functions
 %%------------------------------------------------------------------------------

+ 3 - 2
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -104,8 +104,9 @@ t_healthy(_) ->
     {ok, _} = emqx_resource:create_local(
                 ?ID,
                 ?TEST_RESOURCE,
-                #{name => test_resource}),
-
+                #{name => <<"test_resource">>}, #{async_create => true}),
+    timer:sleep(300),
+    emqx_resource_health_check:create_checker(?ID, 15000),
     #{pid := Pid} = emqx_resource:query(?ID, get_state),
 
     ok = emqx_resource:health_check(?ID),