|
|
@@ -21,8 +21,8 @@
|
|
|
-include_lib("common_test/include/ct.hrl").
|
|
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
|
|
|
|
|
--define(BRIDGE_TYPE, mysql).
|
|
|
--define(BRIDGE_TYPE_BIN, <<"mysql">>).
|
|
|
+-define(ACTION_TYPE, mysql).
|
|
|
+-define(ACTION_TYPE_BIN, <<"mysql">>).
|
|
|
-define(CONNECTOR_TYPE, mysql).
|
|
|
-define(CONNECTOR_TYPE_BIN, <<"mysql">>).
|
|
|
|
|
|
@@ -39,6 +39,9 @@ all() ->
|
|
|
init_per_suite(Config) ->
|
|
|
MysqlHost = os:getenv("MYSQL_TCP_HOST", "toxiproxy"),
|
|
|
MysqlPort = list_to_integer(os:getenv("MYSQL_TCP_PORT", "3306")),
|
|
|
+ ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
|
|
|
+ ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
|
|
|
+ emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
|
|
|
case emqx_common_test_helpers:is_tcp_server_available(MysqlHost, MysqlPort) of
|
|
|
true ->
|
|
|
Apps = emqx_cth_suite:start(
|
|
|
@@ -56,6 +59,9 @@ init_per_suite(Config) ->
|
|
|
),
|
|
|
NConfig = [
|
|
|
{apps, Apps},
|
|
|
+ {proxy_name, "mysql_tcp"},
|
|
|
+ {proxy_host, ProxyHost},
|
|
|
+ {proxy_port, ProxyPort},
|
|
|
{mysql_host, MysqlHost},
|
|
|
{mysql_port, MysqlPort},
|
|
|
{enable_tls, false},
|
|
|
@@ -86,6 +92,9 @@ end_per_group(_Group, _Config) ->
|
|
|
ok.
|
|
|
|
|
|
init_per_testcase(TestCase, Config) ->
|
|
|
+ ProxyHost = ?config(proxy_host, Config),
|
|
|
+ ProxyPort = ?config(proxy_port, Config),
|
|
|
+ emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
|
|
|
ct:timetrap(timer:seconds(60)),
|
|
|
emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
|
|
|
emqx_config:delete_override_conf_files(),
|
|
|
@@ -109,7 +118,7 @@ init_per_testcase(TestCase, Config) ->
|
|
|
{connector_type, proplists:get_value(connector_type, Config, ?CONNECTOR_TYPE)},
|
|
|
{connector_name, Name},
|
|
|
{connector_config, ConnectorConfig},
|
|
|
- {action_type, proplists:get_value(action_type, Config, ?BRIDGE_TYPE)},
|
|
|
+ {action_type, proplists:get_value(action_type, Config, ?ACTION_TYPE)},
|
|
|
{action_name, Name},
|
|
|
{bridge_config, BridgeConfig}
|
|
|
| NConfig
|
|
|
@@ -197,7 +206,7 @@ serde_roundtrip(InnerConfigMap0) ->
|
|
|
InnerConfigMap.
|
|
|
|
|
|
parse_and_check_bridge_config(InnerConfigMap, Name) ->
|
|
|
- emqx_bridge_v2_testlib:parse_and_check(?BRIDGE_TYPE_BIN, Name, InnerConfigMap).
|
|
|
+ emqx_bridge_v2_testlib:parse_and_check(?ACTION_TYPE_BIN, Name, InnerConfigMap).
|
|
|
|
|
|
make_message() ->
|
|
|
ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
|
|
|
@@ -208,6 +217,29 @@ make_message() ->
|
|
|
timestamp => 1668602148000
|
|
|
}.
|
|
|
|
|
|
+create_connector_api(Config, Overrides) ->
|
|
|
+ emqx_bridge_v2_testlib:simplify_result(
|
|
|
+ emqx_bridge_v2_testlib:create_connector_api(
|
|
|
+ Config, Overrides
|
|
|
+ )
|
|
|
+ ).
|
|
|
+
|
|
|
+create_action_api(Config) ->
|
|
|
+ emqx_bridge_v2_testlib:simplify_result(
|
|
|
+ emqx_bridge_v2_testlib:create_kind_api(
|
|
|
+ Config
|
|
|
+ )
|
|
|
+ ).
|
|
|
+
|
|
|
+get_connector_api(Config) ->
|
|
|
+ ConnectorType = ?config(connector_type, Config),
|
|
|
+ ConnectorName = ?config(connector_name, Config),
|
|
|
+ emqx_bridge_v2_testlib:simplify_result(
|
|
|
+ emqx_bridge_v2_testlib:get_connector_api(
|
|
|
+ ConnectorType, ConnectorName
|
|
|
+ )
|
|
|
+ ).
|
|
|
+
|
|
|
%%------------------------------------------------------------------------------
|
|
|
%% Testcases
|
|
|
%%------------------------------------------------------------------------------
|
|
|
@@ -270,3 +302,92 @@ t_update_with_invalid_prepare(Config) ->
|
|
|
WorkerPids
|
|
|
),
|
|
|
ok.
|
|
|
+
|
|
|
+t_timeout_disconnected_then_recover(Config) ->
|
|
|
+ ProxyName = ?config(proxy_name, Config),
|
|
|
+ ProxyHost = ?config(proxy_host, Config),
|
|
|
+ ProxyPort = ?config(proxy_port, Config),
|
|
|
+ ConnectorName = ?config(connector_name, Config),
|
|
|
+ ?check_trace(
|
|
|
+ emqx_bridge_v2_testlib:snk_timetrap(),
|
|
|
+ begin
|
|
|
+ %% 0) Bridge is initially healthy.
|
|
|
+ {201, _} = create_connector_api(
|
|
|
+ Config,
|
|
|
+ #{<<"resource_opts">> => #{<<"health_check_interval">> => <<"500ms">>}}
|
|
|
+ ),
|
|
|
+ {201, _} = create_action_api(Config),
|
|
|
+ ?retry(
|
|
|
+ 500,
|
|
|
+ 10,
|
|
|
+ ?assertMatch(
|
|
|
+ {200, #{<<"status">> := <<"connected">>}},
|
|
|
+ get_connector_api(Config)
|
|
|
+ )
|
|
|
+ ),
|
|
|
+ RuleTopic = <<"t/mysql">>,
|
|
|
+ {ok, _} = emqx_bridge_v2_testlib:create_rule_and_action_http(
|
|
|
+ ?ACTION_TYPE_BIN, RuleTopic, Config
|
|
|
+ ),
|
|
|
+ {ok, C} = emqtt:start_link(),
|
|
|
+ {ok, _} = emqtt:connect(C),
|
|
|
+ Publisher = spawn_link(fun Rec() ->
|
|
|
+ emqtt:publish(C, RuleTopic, <<"aaa">>),
|
|
|
+ timer:sleep(20),
|
|
|
+ Rec()
|
|
|
+ end),
|
|
|
+ %% 1) A connector health check times out, connector becomes `connecting`.
|
|
|
+ ct:pal("starting timeout failure..."),
|
|
|
+ emqx_common_test_helpers:enable_failure(timeout, ProxyName, ProxyHost, ProxyPort),
|
|
|
+ ?retry(
|
|
|
+ 500,
|
|
|
+ 10,
|
|
|
+ ?assertMatch(
|
|
|
+ {200, #{<<"status">> := <<"connecting">>}},
|
|
|
+ get_connector_api(Config)
|
|
|
+ )
|
|
|
+ ),
|
|
|
+ %% 2) Connection is closed by MySQL, crashing the ecpool workers.
|
|
|
+ ct:pal("cutting connection..."),
|
|
|
+ emqx_common_test_helpers:enable_failure(down, ProxyName, ProxyHost, ProxyPort),
|
|
|
+ ?retry(
|
|
|
+ 500,
|
|
|
+ 10,
|
|
|
+ ?assertMatch(
|
|
|
+ {200, #{<<"status">> := <<"connecting">>}},
|
|
|
+ get_connector_api(Config)
|
|
|
+ )
|
|
|
+ ),
|
|
|
+ %% 3) Now recover but drop table, so that it becomes unhealthy.
|
|
|
+ ct:pal("restoring down and timeout failures..."),
|
|
|
+ emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
|
|
|
+ ct:pal("dropping table..."),
|
|
|
+ emqx_bridge_mysql_SUITE:connect_and_drop_table(Config),
|
|
|
+ ?retry(
|
|
|
+ 500,
|
|
|
+ 10,
|
|
|
+ ?assertMatch(
|
|
|
+ {200, #{<<"status">> := <<"disconnected">>}},
|
|
|
+ get_connector_api(Config)
|
|
|
+ )
|
|
|
+ ),
|
|
|
+ %% 4) MySQL becomes healthy again later after table is recreated, should
|
|
|
+ %% resume operations automatically.
|
|
|
+ ct:pal("recreating table..."),
|
|
|
+ emqx_bridge_mysql_SUITE:connect_and_create_table(Config),
|
|
|
+ ?retry(
|
|
|
+ 500,
|
|
|
+ 10,
|
|
|
+ ?assertMatch(
|
|
|
+ {200, #{<<"status">> := <<"connected">>}},
|
|
|
+ get_connector_api(Config)
|
|
|
+ )
|
|
|
+ ),
|
|
|
+ unlink(Publisher),
|
|
|
+ exit(Publisher, kill),
|
|
|
+ emqtt:stop(C),
|
|
|
+ ok
|
|
|
+ end,
|
|
|
+ []
|
|
|
+ ),
|
|
|
+ ok.
|