|
|
@@ -178,14 +178,34 @@ on_batch_query(InstanceId, [{_ChannelId, _} | _] = Query, State) ->
|
|
|
on_batch_query(_InstanceId, Query, _State) ->
|
|
|
{error, {unrecoverable_error, {invalid_request, Query}}}.
|
|
|
|
|
|
-on_get_status(_InstanceId, #{pool_name := Pool}) ->
|
|
|
+health_check_timeout() ->
|
|
|
+ 15000.
|
|
|
+
|
|
|
+on_get_status(_InstanceId, #{pool_name := Pool} = State) ->
|
|
|
Health = emqx_resource_pool:health_check_workers(
|
|
|
- Pool, {emqx_bridge_dynamo_connector_client, is_connected, []}
|
|
|
+ Pool,
|
|
|
+ {emqx_bridge_dynamo_connector_client, is_connected, [
|
|
|
+ emqx_resource_pool:health_check_timeout()
|
|
|
+ ]},
|
|
|
+ health_check_timeout(),
|
|
|
+ #{return_values => true}
|
|
|
),
|
|
|
- status_result(Health).
|
|
|
+ case Health of
|
|
|
+ {error, timeout} ->
|
|
|
+ {?status_connecting, State, <<"timeout_while_checking_connection">>};
|
|
|
+ {ok, Results} ->
|
|
|
+ status_result(Results, State)
|
|
|
+ end.
|
|
|
|
|
|
-status_result(_Status = true) -> ?status_connected;
|
|
|
-status_result(_Status = false) -> ?status_connecting.
|
|
|
+status_result(Results, State) ->
|
|
|
+ case lists:filter(fun(Res) -> Res =/= true end, Results) of
|
|
|
+ [] when Results =:= [] ->
|
|
|
+ ?status_connecting;
|
|
|
+ [] ->
|
|
|
+ ?status_connected;
|
|
|
+ [{false, Error} | _] ->
|
|
|
+ {?status_connecting, State, Error}
|
|
|
+ end.
|
|
|
|
|
|
%%========================================================================================
|
|
|
%% Helper fns
|