|
|
@@ -213,14 +213,18 @@ do_create_dry_run(ResourceType, Config) ->
|
|
|
InstId = make_test_id(),
|
|
|
case emqx_resource:call_start(InstId, ResourceType, Config) of
|
|
|
{ok, ResourceState} ->
|
|
|
- case emqx_resource:call_health_check(InstId, ResourceType, ResourceState) of
|
|
|
- {ok, _} ->
|
|
|
- case emqx_resource:call_stop(InstId, ResourceType, ResourceState) of
|
|
|
- {error, _} = Error -> Error;
|
|
|
- _ -> ok
|
|
|
- end;
|
|
|
- {error, Reason, _} ->
|
|
|
- {error, Reason}
|
|
|
+ Health =
|
|
|
+ case emqx_resource:call_health_check(InstId, ResourceType, ResourceState) of
|
|
|
+ connected ->
|
|
|
+ ok;
|
|
|
+ {connected, _N} ->
|
|
|
+ ok;
|
|
|
+ ConnectStatus ->
|
|
|
+ {error, ConnectStatus}
|
|
|
+ end,
|
|
|
+ case emqx_resource:call_stop(InstId, ResourceType, ResourceState) of
|
|
|
+ {error, _} = Error -> Error;
|
|
|
+ _ -> Health
|
|
|
end;
|
|
|
{error, Reason} ->
|
|
|
{error, Reason}
|
|
|
@@ -262,7 +266,7 @@ do_start(InstId, Group, ResourceType, Config, Opts) when is_binary(InstId) ->
|
|
|
state => undefined
|
|
|
},
|
|
|
%% The `emqx_resource:call_start/3` need the instance exist beforehand
|
|
|
- ets:insert(emqx_resource_instance, {InstId, Group, InitData}),
|
|
|
+ update_resource(InstId, Group, InitData),
|
|
|
spawn(fun() ->
|
|
|
start_and_check(InstId, Group, ResourceType, Config, Opts, InitData)
|
|
|
end),
|
|
|
@@ -273,10 +277,10 @@ start_and_check(InstId, Group, ResourceType, Config, Opts, Data) ->
|
|
|
case emqx_resource:call_start(InstId, ResourceType, Config) of
|
|
|
{ok, ResourceState} ->
|
|
|
Data2 = Data#{state => ResourceState, status => connected},
|
|
|
- ets:insert(emqx_resource_instance, {InstId, Group, Data2}),
|
|
|
+ update_resource(InstId, Group, Data2),
|
|
|
create_default_checker(InstId, Opts);
|
|
|
{error, Reason} ->
|
|
|
- ets:insert(emqx_resource_instance, {InstId, Group, Data#{status => disconnected}}),
|
|
|
+ update_resource(InstId, Group, Data#{status => disconnected}),
|
|
|
{error, Reason}
|
|
|
end.
|
|
|
|
|
|
@@ -295,7 +299,7 @@ do_stop(_Group, #{state := undefined}) ->
|
|
|
do_stop(Group, #{id := InstId, mod := Mod, state := ResourceState} = Data) ->
|
|
|
_ = emqx_resource:call_stop(InstId, Mod, ResourceState),
|
|
|
_ = emqx_resource_health_check:delete_checker(InstId),
|
|
|
- ets:insert(emqx_resource_instance, {InstId, Group, Data#{status => disconnected}}),
|
|
|
+ update_resource(InstId, Group, Data#{status => disconnected}),
|
|
|
ok.
|
|
|
|
|
|
do_health_check(InstId) when is_binary(InstId) ->
|
|
|
@@ -303,35 +307,34 @@ do_health_check(InstId) when is_binary(InstId) ->
|
|
|
|
|
|
do_health_check(_Group, #{state := undefined}) ->
|
|
|
{error, resource_not_initialized};
|
|
|
-do_health_check(Group, #{id := InstId, mod := Mod, state := ResourceState0} = Data) ->
|
|
|
- case emqx_resource:call_health_check(InstId, Mod, ResourceState0) of
|
|
|
- {ok, ResourceState1} ->
|
|
|
- ets:insert(
|
|
|
- emqx_resource_instance,
|
|
|
- {InstId, Group, Data#{status => connected, state => ResourceState1}}
|
|
|
- ),
|
|
|
+do_health_check(
|
|
|
+ Group,
|
|
|
+ #{id := InstId, mod := Mod, state := ResourceState, status := OldStatus} = Data
|
|
|
+) ->
|
|
|
+ case emqx_resource:call_health_check(InstId, Mod, ResourceState) of
|
|
|
+ {NewConnStatus, NewResourceState} ->
|
|
|
+ NData = Data#{status => NewConnStatus, state => NewResourceState},
|
|
|
+ update_resource(InstId, Group, NData),
|
|
|
+ maybe_log_health_check_result(InstId, NewConnStatus);
|
|
|
+ NewConnStatus ->
|
|
|
+ NData = Data#{status => NewConnStatus},
|
|
|
+ NewConnStatus /= OldStatus andalso update_resource(InstId, Group, NData),
|
|
|
+ maybe_log_health_check_result(InstId, NewConnStatus)
|
|
|
+ end.
|
|
|
+
|
|
|
+maybe_log_health_check_result(InstId, Result) ->
|
|
|
+ case Result of
|
|
|
+ connected ->
|
|
|
ok;
|
|
|
- {error, Reason} ->
|
|
|
- logger:error("health check for ~p failed: ~p", [InstId, Reason]),
|
|
|
- ets:insert(
|
|
|
- emqx_resource_instance,
|
|
|
- {InstId, Group, Data#{status => connecting}}
|
|
|
- ),
|
|
|
- {error, Reason};
|
|
|
- {error, Reason, ResourceState1} ->
|
|
|
- logger:error("health check for ~p failed: ~p", [InstId, Reason]),
|
|
|
- ets:insert(
|
|
|
- emqx_resource_instance,
|
|
|
- {InstId, Group, Data#{status => connecting, state => ResourceState1}}
|
|
|
- ),
|
|
|
- {error, Reason}
|
|
|
+ ConnectStatus ->
|
|
|
+ logger:error("health check for ~p failed: ~p", [InstId, ConnectStatus])
|
|
|
end.
|
|
|
|
|
|
do_set_resource_status_connecting(InstId) ->
|
|
|
case emqx_resource_instance:lookup(InstId) of
|
|
|
{ok, Group, #{id := InstId} = Data} ->
|
|
|
logger:error("health check for ~p failed: timeout", [InstId]),
|
|
|
- ets:insert(emqx_resource_instance, {InstId, Group, Data#{status => connecting}});
|
|
|
+ update_resource(InstId, Group, Data#{status => connecting});
|
|
|
Error ->
|
|
|
{error, Error}
|
|
|
end.
|
|
|
@@ -340,6 +343,9 @@ do_set_resource_status_connecting(InstId) ->
|
|
|
%% internal functions
|
|
|
%%------------------------------------------------------------------------------
|
|
|
|
|
|
+update_resource(InstId, Group, Data) ->
|
|
|
+ ets:insert(emqx_resource_instance, {InstId, Group, Data}).
|
|
|
+
|
|
|
do_with_group_and_instance_data(InstId, Do, Args) ->
|
|
|
case lookup(InstId) of
|
|
|
{ok, Group, Data} -> erlang:apply(Do, [Group, Data | Args]);
|