|
|
@@ -24,6 +24,7 @@
|
|
|
-include_lib("stdlib/include/assert.hrl").
|
|
|
|
|
|
-define(MYSQL_HOST, "mysql").
|
|
|
+-define(MYSQL_RESOURCE_MOD, emqx_connector_mysql).
|
|
|
|
|
|
all() ->
|
|
|
emqx_common_test_helpers:all(?MODULE).
|
|
|
@@ -34,100 +35,94 @@ groups() ->
|
|
|
init_per_suite(Config) ->
|
|
|
case emqx_common_test_helpers:is_tcp_server_available(?MYSQL_HOST, ?MYSQL_DEFAULT_PORT) of
|
|
|
true ->
|
|
|
- ok = emqx_connector_test_helpers:start_apps([ecpool, mysql]),
|
|
|
+ ok = emqx_common_test_helpers:start_apps([emqx_conf]),
|
|
|
+ ok = emqx_connector_test_helpers:start_apps([emqx_resource, emqx_connector]),
|
|
|
Config;
|
|
|
false ->
|
|
|
{skip, no_mysql}
|
|
|
end.
|
|
|
|
|
|
end_per_suite(_Config) ->
|
|
|
- ok = emqx_connector_test_helpers:stop_apps([ecpool, mysql]).
|
|
|
+ ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
|
|
|
+ ok = emqx_connector_test_helpers:stop_apps([emqx_resource, emqx_connector]).
|
|
|
|
|
|
init_per_testcase(_, Config) ->
|
|
|
- ?assertEqual(
|
|
|
- {ok, #{poolname => emqx_connector_mysql}},
|
|
|
- emqx_connector_mysql:on_start(<<"emqx_connector_mysql">>, mysql_config())
|
|
|
- ),
|
|
|
Config.
|
|
|
|
|
|
end_per_testcase(_, _Config) ->
|
|
|
- ?assertEqual(
|
|
|
- ok,
|
|
|
- emqx_connector_mysql:on_stop(<<"emqx_connector_mysql">>, #{poolname => emqx_connector_mysql})
|
|
|
- ).
|
|
|
+ ok.
|
|
|
|
|
|
% %%------------------------------------------------------------------------------
|
|
|
% %% Testcases
|
|
|
% %%------------------------------------------------------------------------------
|
|
|
|
|
|
-% Simple test to make sure the proper reference to the module is returned.
|
|
|
-t_roots(_Config) ->
|
|
|
- ExpectedRoots = [{config, #{type => {ref, emqx_connector_mysql, config}}}],
|
|
|
- ActualRoots = emqx_connector_mysql:roots(),
|
|
|
- ?assertEqual(ExpectedRoots, ActualRoots).
|
|
|
-
|
|
|
-% Not sure if this level of testing is appropriate for this function.
|
|
|
-% Checking the actual values/types of the returned term starts getting
|
|
|
-% into checking the emqx_connector_schema_lib.erl returns and the shape
|
|
|
-% of expected data elsewhere.
|
|
|
-t_fields(_Config) ->
|
|
|
- Fields = emqx_connector_mysql:fields(config),
|
|
|
- lists:foreach(
|
|
|
- fun({FieldName, FieldValue}) ->
|
|
|
- ?assert(is_atom(FieldName)),
|
|
|
- if
|
|
|
- is_map(FieldValue) ->
|
|
|
- ?assert(maps:is_key(type, FieldValue) and maps:is_key(default, FieldValue));
|
|
|
- true ->
|
|
|
- ?assert(is_function(FieldValue))
|
|
|
- end
|
|
|
- end,
|
|
|
- Fields
|
|
|
- ).
|
|
|
-
|
|
|
-% Execute a minimal query to validate connection.
|
|
|
-t_basic_query(_Config) ->
|
|
|
- ?assertMatch(
|
|
|
- {ok, _, [[1]]},
|
|
|
- emqx_connector_mysql:on_query(
|
|
|
- <<"emqx_connector_mysql">>, {sql, test_query()}, undefined, #{
|
|
|
- poolname => emqx_connector_mysql
|
|
|
- }
|
|
|
- )
|
|
|
+t_lifecycle(_Config) ->
|
|
|
+ perform_lifecycle_check(
|
|
|
+ <<"emqx_connector_mysql_SUITE">>,
|
|
|
+ mysql_config()
|
|
|
).
|
|
|
|
|
|
-% Perform health check.
|
|
|
-t_do_healthcheck(_Config) ->
|
|
|
- ?assertEqual(
|
|
|
- {ok, #{poolname => emqx_connector_mysql}},
|
|
|
- emqx_connector_mysql:on_health_check(<<"emqx_connector_mysql">>, #{
|
|
|
- poolname => emqx_connector_mysql
|
|
|
- })
|
|
|
- ).
|
|
|
-
|
|
|
-% Perform healthcheck on a connector that does not exist.
|
|
|
-t_healthceck_when_connector_does_not_exist(_Config) ->
|
|
|
- ?assertEqual(
|
|
|
- {error, health_check_failed, #{poolname => emqx_connector_mysql_does_not_exist}},
|
|
|
- emqx_connector_mysql:on_health_check(<<"emqx_connector_mysql_does_not_exist">>, #{
|
|
|
- poolname => emqx_connector_mysql_does_not_exist
|
|
|
- })
|
|
|
- ).
|
|
|
+perform_lifecycle_check(PoolName, InitialConfig) ->
|
|
|
+ {ok, #{config := CheckedConfig}} = emqx_resource:check_config(?MYSQL_RESOURCE_MOD, InitialConfig),
|
|
|
+ {ok, #{state := #{poolname := ReturnedPoolName} = State, status := InitialStatus}} = emqx_resource:create_local(
|
|
|
+ PoolName,
|
|
|
+ ?CONNECTOR_RESOURCE_GROUP,
|
|
|
+ ?MYSQL_RESOURCE_MOD,
|
|
|
+ CheckedConfig
|
|
|
+ ),
|
|
|
+ ?assertEqual(InitialStatus, started),
|
|
|
+ % Instance should match the state and status of the just started resource
|
|
|
+ {ok, ?CONNECTOR_RESOURCE_GROUP, #{state := State, status := InitialStatus}} = emqx_resource:get_instance(PoolName),
|
|
|
+ ?assertEqual(ok, emqx_resource:health_check(PoolName)),
|
|
|
+ % % Perform query as further check that the resource is working as expected
|
|
|
+ ?assertMatch({ok, _, [[1]]}, emqx_resource:query(PoolName, test_query_no_params())),
|
|
|
+ ?assertMatch({ok, _, [[1]]}, emqx_resource:query(PoolName, test_query_with_params())),
|
|
|
+ ?assertMatch({ok, _, [[1]]}, emqx_resource:query(PoolName, test_query_with_params_and_timeout())),
|
|
|
+ ?assertEqual(ok, emqx_resource:stop(PoolName)),
|
|
|
+ % Resource will be listed still, but state will be changed and healthcheck will fail
|
|
|
+ % as the worker no longer exists.
|
|
|
+ {ok, ?CONNECTOR_RESOURCE_GROUP, #{state := State, status := StoppedStatus}} = emqx_resource:get_instance(PoolName),
|
|
|
+ ?assertEqual(StoppedStatus, stopped),
|
|
|
+ ?assertEqual({error,health_check_failed}, emqx_resource:health_check(PoolName)),
|
|
|
+ % Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself.
|
|
|
+ ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),
|
|
|
+ % Can call stop/1 again on an already stopped instance
|
|
|
+ ?assertEqual(ok, emqx_resource:stop(PoolName)),
|
|
|
+ % Make sure it can be restarted and the healthchecks and queries work properly
|
|
|
+ ?assertEqual(ok, emqx_resource:restart(PoolName)),
|
|
|
+ {ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} = emqx_resource:get_instance(PoolName),
|
|
|
+ ?assertEqual(ok, emqx_resource:health_check(PoolName)),
|
|
|
+ ?assertMatch({ok, _, [[1]]}, emqx_resource:query(PoolName, test_query_no_params())),
|
|
|
+ ?assertMatch({ok, _, [[1]]}, emqx_resource:query(PoolName, test_query_with_params())),
|
|
|
+ ?assertMatch({ok, _, [[1]]}, emqx_resource:query(PoolName, test_query_with_params_and_timeout())),
|
|
|
+ % Stop and remove the resource in one go.
|
|
|
+ ?assertEqual(ok, emqx_resource:remove_local(PoolName)),
|
|
|
+ ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),
|
|
|
+ % Should not even be able to get the resource data out of ets now unlike just stopping.
|
|
|
+ ?assertEqual({error, not_found}, emqx_resource:get_instance(PoolName)).
|
|
|
|
|
|
% %%------------------------------------------------------------------------------
|
|
|
% %% Helpers
|
|
|
% %%------------------------------------------------------------------------------
|
|
|
|
|
|
mysql_config() ->
|
|
|
- #{
|
|
|
- auto_reconnect => true,
|
|
|
- database => <<"mqtt">>,
|
|
|
- username => <<"root">>,
|
|
|
- password => <<"public">>,
|
|
|
- pool_size => 8,
|
|
|
- server => {?MYSQL_HOST, ?MYSQL_DEFAULT_PORT},
|
|
|
- ssl => #{enable => false}
|
|
|
- }.
|
|
|
-
|
|
|
-test_query() ->
|
|
|
- <<"SELECT 1">>.
|
|
|
+ RawConfig = list_to_binary(io_lib:format("""
|
|
|
+ auto_reconnect = true
|
|
|
+ database = mqtt
|
|
|
+ username= root
|
|
|
+ password = public
|
|
|
+ pool_size = 8
|
|
|
+ server = \"~s:~b\"
|
|
|
+ """, [?MYSQL_HOST, ?MYSQL_DEFAULT_PORT])),
|
|
|
+
|
|
|
+ {ok, Config} = hocon:binary(RawConfig),
|
|
|
+ #{<<"config">> => Config}.
|
|
|
+
|
|
|
+test_query_no_params() ->
|
|
|
+ {sql, <<"SELECT 1">>}.
|
|
|
+
|
|
|
+test_query_with_params() ->
|
|
|
+ {sql, <<"SELECT ?">>, [1]}.
|
|
|
+
|
|
|
+test_query_with_params_and_timeout() ->
|
|
|
+ {sql, <<"SELECT ?">>, [1], 1000}.
|