|
|
@@ -19,6 +19,8 @@
|
|
|
-include_lib("typerefl/include/types.hrl").
|
|
|
-include_lib("hocon/include/hoconsc.hrl").
|
|
|
-include_lib("emqx/include/logger.hrl").
|
|
|
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
|
|
+-include_lib("emqx_resource/include/emqx_resource.hrl").
|
|
|
|
|
|
-export([namespace/0, roots/0, fields/1, redis_fields/0, desc/1]).
|
|
|
|
|
|
@@ -231,7 +233,7 @@ is_unrecoverable_error({error, invalid_cluster_command}) ->
|
|
|
is_unrecoverable_error(_) ->
|
|
|
false.
|
|
|
|
|
|
-on_get_status(_InstId, #{type := cluster, pool_name := PoolName}) ->
|
|
|
+on_get_status(_InstId, #{type := cluster, pool_name := PoolName} = State) ->
|
|
|
case eredis_cluster:pool_exists(PoolName) of
|
|
|
true ->
|
|
|
%% eredis_cluster has null slot even pool_exists when emqx start before redis cluster.
|
|
|
@@ -240,26 +242,51 @@ on_get_status(_InstId, #{type := cluster, pool_name := PoolName}) ->
|
|
|
%% In this case, we can directly consider it as a disconnect and then proceed to reconnect.
|
|
|
case eredis_cluster_monitor:get_all_pools(PoolName) of
|
|
|
[] ->
|
|
|
- disconnected;
|
|
|
+ ?status_disconnected;
|
|
|
[_ | _] ->
|
|
|
- Health = eredis_cluster:ping_all(PoolName),
|
|
|
- status_result(Health)
|
|
|
+ do_cluster_status_check(PoolName, State)
|
|
|
end;
|
|
|
false ->
|
|
|
- disconnected
|
|
|
+ ?status_disconnected
|
|
|
end;
|
|
|
-on_get_status(_InstId, #{pool_name := PoolName}) ->
|
|
|
- Health = emqx_resource_pool:health_check_workers(PoolName, fun ?MODULE:do_get_status/1),
|
|
|
- status_result(Health).
|
|
|
-
|
|
|
-do_get_status(Conn) ->
|
|
|
- case eredis:q(Conn, ["PING"]) of
|
|
|
- {ok, _} -> true;
|
|
|
- _ -> false
|
|
|
+on_get_status(_InstId, #{pool_name := PoolName} = State) ->
|
|
|
+ HealthCheckResoults = emqx_resource_pool:health_check_workers(
|
|
|
+ PoolName,
|
|
|
+ fun ?MODULE:do_get_status/1,
|
|
|
+ emqx_resource_pool:health_check_timeout(),
|
|
|
+ #{return_values => true}
|
|
|
+ ),
|
|
|
+ case HealthCheckResoults of
|
|
|
+ {ok, Results} ->
|
|
|
+ sum_worker_results(Results, State);
|
|
|
+ Error ->
|
|
|
+ {?status_disconnected, State, Error}
|
|
|
end.
|
|
|
|
|
|
-status_result(_Status = true) -> connected;
|
|
|
-status_result(_Status = false) -> connecting.
|
|
|
+do_cluster_status_check(Pool, State) ->
|
|
|
+ Pongs = eredis_cluster:qa(Pool, [<<"PING">>]),
|
|
|
+ sum_worker_results(Pongs, State).
|
|
|
+
|
|
|
+do_get_status(Conn) ->
|
|
|
+ eredis:q(Conn, ["PING"]).
|
|
|
+
|
|
|
+sum_worker_results([], _State) ->
|
|
|
+ ?status_connected;
|
|
|
+sum_worker_results([{error, <<"NOAUTH Authentication required.">>} = Error | _Rest], State) ->
|
|
|
+ ?tp(emqx_redis_auth_required_error, #{}),
|
|
|
+ %% This requires user action to fix so we set the status to disconnected
|
|
|
+ {?status_disconnected, State, {unhealthy_target, Error}};
|
|
|
+sum_worker_results([{ok, _} | Rest], State) ->
|
|
|
+ sum_worker_results(Rest, State);
|
|
|
+sum_worker_results([Error | _Rest], State) ->
|
|
|
+ ?SLOG(
|
|
|
+ warning,
|
|
|
+ #{
|
|
|
+ msg => "emqx_redis_check_status_error",
|
|
|
+ error => Error
|
|
|
+ }
|
|
|
+ ),
|
|
|
+ {?status_connecting, State, Error}.
|
|
|
|
|
|
do_cmd(PoolName, cluster, {cmd, Command}) ->
|
|
|
eredis_cluster:q(PoolName, Command);
|