|
|
@@ -6,10 +6,11 @@
|
|
|
|
|
|
-behaviour(emqx_resource).
|
|
|
|
|
|
--include_lib("typerefl/include/types.hrl").
|
|
|
+-include_lib("hocon/include/hoconsc.hrl").
|
|
|
-include_lib("emqx/include/logger.hrl").
|
|
|
+-include_lib("typerefl/include/types.hrl").
|
|
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
|
|
--include_lib("hocon/include/hoconsc.hrl").
|
|
|
+-include_lib("emqx_resource/include/emqx_resource.hrl").
|
|
|
|
|
|
-export([namespace/0, roots/0, fields/1, desc/1]).
|
|
|
|
|
|
@@ -209,18 +210,50 @@ on_batch_query(InstanceId, BatchReq, State) ->
|
|
|
?SLOG(error, LogMeta#{msg => "invalid_request"}),
|
|
|
{error, {unrecoverable_error, invalid_request}}.
|
|
|
|
|
|
-on_get_status(_InstanceId, #{pool_name := PoolName}) ->
|
|
|
- Health = emqx_resource_pool:health_check_workers(PoolName, fun ?MODULE:do_get_status/1),
|
|
|
- status_result(Health).
|
|
|
+on_get_status(_InstanceId, #{pool_name := PoolName} = State) ->
|
|
|
+ case
|
|
|
+ emqx_resource_pool:health_check_workers(
|
|
|
+ PoolName,
|
|
|
+ fun ?MODULE:do_get_status/1,
|
|
|
+ emqx_resource_pool:health_check_timeout(),
|
|
|
+ #{return_values => true}
|
|
|
+ )
|
|
|
+ of
|
|
|
+ {ok, []} ->
|
|
|
+ {?status_connecting, State, undefined};
|
|
|
+ {ok, Values} ->
|
|
|
+ case lists:keyfind(error, 1, Values) of
|
|
|
+ false ->
|
|
|
+ ?status_connected;
|
|
|
+ {error, Reason} ->
|
|
|
+ {?status_connecting, State, enhance_reason(Reason)}
|
|
|
+ end;
|
|
|
+ {error, Reason} ->
|
|
|
+ {?status_connecting, State, enhance_reason(Reason)}
|
|
|
+ end.
|
|
|
|
|
|
do_get_status(Conn) ->
|
|
|
- case tdengine:insert(Conn, "select server_version()", []) of
|
|
|
- {ok, _} -> true;
|
|
|
- _ -> false
|
|
|
+ try
|
|
|
+ tdengine:insert(
|
|
|
+ Conn,
|
|
|
+ "select server_version()",
|
|
|
+ [],
|
|
|
+ emqx_resource_pool:health_check_timeout()
|
|
|
+ )
|
|
|
+ of
|
|
|
+ {ok, _} ->
|
|
|
+ true;
|
|
|
+ {error, _} = Error ->
|
|
|
+ Error
|
|
|
+ catch
|
|
|
+ _Type:Reason ->
|
|
|
+ {error, Reason}
|
|
|
end.
|
|
|
|
|
|
-status_result(_Status = true) -> connected;
|
|
|
-status_result(_Status = false) -> connecting.
|
|
|
+enhance_reason(timeout) ->
|
|
|
+ connection_timeout;
|
|
|
+enhance_reason(Reason) ->
|
|
|
+ Reason.
|
|
|
|
|
|
on_add_channel(
|
|
|
_InstanceId,
|
|
|
@@ -253,7 +286,12 @@ on_get_channels(InstanceId) ->
|
|
|
on_get_channel_status(InstanceId, ChannelId, #{channels := Channels} = State) ->
|
|
|
case maps:is_key(ChannelId, Channels) of
|
|
|
true ->
|
|
|
- on_get_status(InstanceId, State);
|
|
|
+ case on_get_status(InstanceId, State) of
|
|
|
+ {Status, _State, Reason} ->
|
|
|
+ {Status, Reason};
|
|
|
+ Status ->
|
|
|
+ Status
|
|
|
+ end;
|
|
|
_ ->
|
|
|
{error, not_exists}
|
|
|
end.
|