Просмотр исходного кода

Merge pull request #7003 from mononym/EMQX-3145-redis

feat: add basic redis tests and bring redis connector module in line …
Chris Hicks 3 лет назад
Родитель
Сommit
f7d78d0600

+ 1 - 1
apps/emqx_connector/rebar.config

@@ -13,7 +13,7 @@
   %% NOTE: mind poolboy version when updating mongodb-erlang version
   %% NOTE: mind poolboy version when updating mongodb-erlang version
   {mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.11"}}},
   {mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.11"}}},
   %% NOTE: mind poolboy version when updating eredis_cluster version
   %% NOTE: mind poolboy version when updating eredis_cluster version
-  {eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.6.7"}}},
+  {eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.7.0"}}},
   %% mongodb-erlang uses a special fork https://github.com/comtihon/poolboy.git
   %% mongodb-erlang uses a special fork https://github.com/comtihon/poolboy.git
   %% (which has overflow_ttl feature added).
   %% (which has overflow_ttl feature added).
   %% However, it references `{branch, "master}` (commit 9c06a9a on 2021-04-07).
   %% However, it references `{branch, "master}` (commit 9c06a9a on 2021-04-07).

+ 27 - 10
apps/emqx_connector/src/emqx_connector_redis.erl

@@ -129,10 +129,13 @@ on_start(InstId, #{redis_type := Type,
             end
             end
     end.
     end.
 
 
-on_stop(InstId, #{poolname := PoolName}) ->
+on_stop(InstId, #{poolname := PoolName, type := Type}) ->
     ?SLOG(info, #{msg => "stopping_redis_connector",
     ?SLOG(info, #{msg => "stopping_redis_connector",
                   connector => InstId}),
                   connector => InstId}),
-    emqx_plugin_libs_pool:stop_pool(PoolName).
+    case Type of
+        cluster -> eredis_cluster:stop_pool(PoolName);
+        _ -> emqx_plugin_libs_pool:stop_pool(PoolName)
+    end.
 
 
 on_query(InstId, {cmd, Command}, AfterCommand, #{poolname := PoolName, type := Type} = State) ->
 on_query(InstId, {cmd, Command}, AfterCommand, #{poolname := PoolName, type := Type} = State) ->
     ?TRACE("QUERY", "redis_connector_received",
     ?TRACE("QUERY", "redis_connector_received",
@@ -151,16 +154,30 @@ on_query(InstId, {cmd, Command}, AfterCommand, #{poolname := PoolName, type := T
     end,
     end,
     Result.
     Result.
 
 
+extract_eredis_cluster_workers(PoolName) ->
+    lists:flatten([gen_server:call(PoolPid, get_all_workers) ||
+                             PoolPid <- eredis_cluster_monitor:get_all_pools(PoolName)]).
+
+eredis_cluster_workers_exist_and_are_connected(Workers) ->
+    length(Workers) > 0 andalso lists:all(
+        fun({_, Pid, _, _}) ->
+            eredis_cluster_pool_worker:is_connected(Pid) =:= true
+        end, Workers).
+
 on_health_check(_InstId, #{type := cluster, poolname := PoolName} = State) ->
 on_health_check(_InstId, #{type := cluster, poolname := PoolName} = State) ->
-    Workers = lists:flatten([gen_server:call(PoolPid, get_all_workers) ||
-                             PoolPid <- eredis_cluster_monitor:get_all_pools(PoolName)]),
-    case length(Workers) > 0 andalso lists:all(
-            fun({_, Pid, _, _}) ->
-                eredis_cluster_pool_worker:is_connected(Pid) =:= true
-            end, Workers) of
-        true -> {ok, State};
-        false -> {error, health_check_failed, State}
+    case eredis_cluster:pool_exists(PoolName) of
+        true ->
+            Workers = extract_eredis_cluster_workers(PoolName),
+            case eredis_cluster_workers_exist_and_are_connected(Workers) of
+                true -> {ok, State};
+                false -> {error, health_check_failed, State}
+            end;
+
+        false ->
+            {error, health_check_failed, State}
     end;
     end;
+
+
 on_health_check(_InstId, #{poolname := PoolName} = State) ->
 on_health_check(_InstId, #{poolname := PoolName} = State) ->
     emqx_plugin_libs_pool:health_check(PoolName, fun ?MODULE:do_health_check/1, State).
     emqx_plugin_libs_pool:health_check(PoolName, fun ?MODULE:do_health_check/1, State).
 
 

+ 139 - 0
apps/emqx_connector/test/emqx_connector_redis_SUITE.erl

@@ -0,0 +1,139 @@
+% %%--------------------------------------------------------------------
+% %% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+% %%
+% %% Licensed under the Apache License, Version 2.0 (the "License");
+% %% you may not use this file except in compliance with the License.
+% %% You may obtain a copy of the License at
+% %% http://www.apache.org/licenses/LICENSE-2.0
+% %%
+% %% Unless required by applicable law or agreed to in writing, software
+% %% distributed under the License is distributed on an "AS IS" BASIS,
+% %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+% %% See the License for the specific language governing permissions and
+% %% limitations under the License.
+% %%--------------------------------------------------------------------
+
+-module(emqx_connector_redis_SUITE).
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+-include("emqx_connector.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("emqx/include/emqx.hrl").
+-include_lib("stdlib/include/assert.hrl").
+
+-define(REDIS_HOST, "redis").
+-define(REDIS_PORT, 6379).
+-define(REDIS_RESOURCE_MOD, emqx_connector_redis).
+
+all() ->
+    emqx_common_test_helpers:all(?MODULE).
+
+groups() ->
+    [].
+
+init_per_suite(Config) ->
+    case emqx_common_test_helpers:is_tcp_server_available(?REDIS_HOST, ?REDIS_PORT) of
+        true ->
+            ok = emqx_common_test_helpers:start_apps([emqx_conf]),
+            ok = emqx_connector_test_helpers:start_apps([emqx_resource, emqx_connector]),
+            Config;
+        false ->
+            {skip, no_redis}
+    end.
+
+end_per_suite(_Config) ->
+    ok = emqx_common_test_helpers:stop_apps([emqx_resource, emqx_connector]).
+
+init_per_testcase(_, Config) ->
+    Config.
+
+end_per_testcase(_, _Config) ->
+    ok.
+
+% %%------------------------------------------------------------------------------
+% %% Testcases
+% %%------------------------------------------------------------------------------
+
+t_single_lifecycle(_Config) ->
+    perform_lifecycle_check(
+        <<"emqx_connector_redis_SUITE_single">>,
+        redis_config_single(),
+        [<<"PING">>]
+    ).
+
+t_cluster_lifecycle(_Config) ->
+    perform_lifecycle_check(
+        <<"emqx_connector_redis_SUITE_cluster">>,
+        redis_config_cluster(),
+        [<<"PING">>, <<"PONG">>]
+    ).
+
+t_sentinel_lifecycle(_Config) ->
+    perform_lifecycle_check(
+        <<"emqx_connector_redis_SUITE_sentinel">>,
+        redis_config_sentinel(),
+        [<<"PING">>]
+    ).
+
+perform_lifecycle_check(PoolName, InitialConfig, RedisCommand) ->
+    {ok, #{config := CheckedConfig}} = emqx_resource:check_config(?REDIS_RESOURCE_MOD, InitialConfig),
+    {ok, #{state := #{poolname := ReturnedPoolName} = State, status := InitialStatus}} = emqx_resource:create_local(
+        PoolName,
+        ?CONNECTOR_RESOURCE_GROUP,
+        ?REDIS_RESOURCE_MOD,
+        CheckedConfig
+    ),
+    ?assertEqual(InitialStatus, started),
+    % Instance should match the state and status of the just started resource
+    {ok, ?CONNECTOR_RESOURCE_GROUP, #{state := State, status := InitialStatus}} = emqx_resource:get_instance(PoolName),
+    ?assertEqual(ok, emqx_resource:health_check(PoolName)),
+    % Perform query as further check that the resource is working as expected
+    ?assertEqual({ok, <<"PONG">>}, emqx_resource:query(PoolName, {cmd, RedisCommand})),
+    ?assertEqual(ok, emqx_resource:stop(PoolName)),
+    % Resource will be listed still, but state will be changed and healthcheck will fail
+    % as the worker no longer exists.
+    {ok, ?CONNECTOR_RESOURCE_GROUP, #{state := State, status := StoppedStatus}} = emqx_resource:get_instance(PoolName),
+    ?assertEqual(StoppedStatus, stopped),
+    ?assertEqual({error,health_check_failed}, emqx_resource:health_check(PoolName)),
+    % Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself.
+    ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),
+    % Can call stop/1 again on an already stopped instance
+    ?assertEqual(ok, emqx_resource:stop(PoolName)),
+    % Make sure it can be restarted and the healthchecks and queries work properly
+    ?assertEqual(ok, emqx_resource:restart(PoolName)),
+    {ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} = emqx_resource:get_instance(PoolName),
+    ?assertEqual(ok, emqx_resource:health_check(PoolName)),
+    ?assertEqual({ok, <<"PONG">>}, emqx_resource:query(PoolName, {cmd, RedisCommand})),
+    % Stop and remove the resource in one go.
+    ?assertEqual(ok, emqx_resource:remove_local(PoolName)),
+    ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),
+    % Should not even be able to get the resource data out of ets now unlike just stopping.
+    ?assertEqual({error, not_found}, emqx_resource:get_instance(PoolName)).
+
+% %%------------------------------------------------------------------------------
+% %% Helpers
+% %%------------------------------------------------------------------------------
+
+redis_config_single() ->
+    redis_config_base("single", "server").
+
+redis_config_cluster() ->
+    redis_config_base("cluster", "servers").
+
+redis_config_sentinel() ->
+    redis_config_base("sentinel", "servers").
+
+redis_config_base(Type, ServerKey) ->
+    RawConfig = list_to_binary(io_lib:format("""
+    auto_reconnect = true
+    database = 1
+    pool_size = 8
+    redis_type = ~s
+    password = public
+    ~s = \"~s:~b\"
+    """, [Type, ServerKey, ?REDIS_HOST, ?REDIS_PORT])),
+
+    {ok, Config} = hocon:binary(RawConfig),
+    #{<<"config">> => Config}.