Explorar o código

Merge pull request #7840 from EMQ-YangM/fix_resource_health_check

fix: set resource status disconnected when health check timeout
DDDHuang %!s(int64=3) %!d(string=hai) anos
pai
achega
bb87092358

+ 4 - 2
apps/emqx_plugin_libs/src/emqx_plugin_libs_pool.erl

@@ -68,8 +68,10 @@ get_status(PoolName, CheckFunc, AutoReconn) when is_function(CheckFunc) ->
     Status = [
         begin
             case ecpool_worker:client(Worker) of
-                {ok, Conn} -> CheckFunc(Conn);
-                _ -> false
+                {ok, Conn} ->
+                    erlang:is_process_alive(Conn) andalso CheckFunc(Conn);
+                _ ->
+                    false
             end
         end
      || {_WorkerName, Worker} <- ecpool:workers(PoolName)

+ 10 - 1
apps/emqx_resource/src/emqx_resource_health_check.erl

@@ -15,6 +15,8 @@
 %%--------------------------------------------------------------------
 -module(emqx_resource_health_check).
 
+-include_lib("emqx/include/logger.hrl").
+
 -export([
     start_link/3,
     create_checker/3,
@@ -98,7 +100,14 @@ health_check_timeout_checker(Pid, Name, SleepTime, Timeout) ->
             #{name => Name},
             <<Name/binary, " health check timeout">>
         ),
-        emqx_resource:set_resource_status_connecting(Name),
+        ?SLOG(
+            error,
+            #{
+                msg => "health check failed: timeout",
+                name => Name
+            }
+        ),
+        _ = emqx_resource_instance:set_resource_status(Name, disconnected),
         receive
             health_check_finish -> timer:sleep(SleepTime)
         end

+ 20 - 1
apps/emqx_resource/src/emqx_resource_instance.erl

@@ -19,6 +19,7 @@
 
 -include("emqx_resource.hrl").
 -include("emqx_resource_utils.hrl").
+-include_lib("emqx/include/logger.hrl").
 
 -export([start_link/2]).
 
@@ -28,7 +29,8 @@
     get_metrics/1,
     reset_metrics/1,
     list_all/0,
-    list_group/1
+    list_group/1,
+    set_resource_status/2
 ]).
 
 -export([
@@ -339,6 +341,23 @@ do_set_resource_status_connecting(InstId) ->
             {error, Error}
     end.
 
+-spec set_resource_status(instance_id(), resource_connection_status()) -> ok | {error, term()}.
+set_resource_status(InstId, Status) ->
+    case lookup(InstId) of
+        {ok, Group, #{id := _} = Data} ->
+            update_resource(InstId, Group, Data#{status => Status});
+        Error ->
+            ?SLOG(
+                error,
+                #{
+                    msg => "set resource status field",
+                    resource_id => InstId,
+                    reason => Error
+                }
+            ),
+            Error
+    end.
+
 %%------------------------------------------------------------------------------
 %% internal functions
 %%------------------------------------------------------------------------------