|
|
@@ -45,15 +45,16 @@ all() ->
|
|
|
groups() ->
|
|
|
TCs = emqx_common_test_helpers:all(?MODULE),
|
|
|
NonBatchCases = [t_write_timeout, t_uninitialized_prepared_statement],
|
|
|
+ BatchingGroups = [
|
|
|
+ {group, with_batch},
|
|
|
+ {group, without_batch}
|
|
|
+ ],
|
|
|
+ QueryModeGroups = [{group, async}, {group, sync}],
|
|
|
[
|
|
|
- {tcp, [
|
|
|
- {group, with_batch},
|
|
|
- {group, without_batch}
|
|
|
- ]},
|
|
|
- {tls, [
|
|
|
- {group, with_batch},
|
|
|
- {group, without_batch}
|
|
|
- ]},
|
|
|
+ {tcp, QueryModeGroups},
|
|
|
+ {tls, QueryModeGroups},
|
|
|
+ {async, BatchingGroups},
|
|
|
+ {sync, BatchingGroups},
|
|
|
{with_batch, TCs -- NonBatchCases},
|
|
|
{without_batch, TCs}
|
|
|
].
|
|
|
@@ -65,7 +66,6 @@ init_per_group(tcp, Config) ->
|
|
|
{mysql_host, MysqlHost},
|
|
|
{mysql_port, MysqlPort},
|
|
|
{enable_tls, false},
|
|
|
- {query_mode, sync},
|
|
|
{proxy_name, "mysql_tcp"}
|
|
|
| Config
|
|
|
];
|
|
|
@@ -76,10 +76,13 @@ init_per_group(tls, Config) ->
|
|
|
{mysql_host, MysqlHost},
|
|
|
{mysql_port, MysqlPort},
|
|
|
{enable_tls, true},
|
|
|
- {query_mode, sync},
|
|
|
{proxy_name, "mysql_tls"}
|
|
|
| Config
|
|
|
];
|
|
|
+init_per_group(async, Config) ->
|
|
|
+ [{query_mode, async} | Config];
|
|
|
+init_per_group(sync, Config) ->
|
|
|
+ [{query_mode, sync} | Config];
|
|
|
init_per_group(with_batch, Config0) ->
|
|
|
Config = [{batch_size, 100} | Config0],
|
|
|
common_init(Config);
|
|
|
@@ -99,6 +102,7 @@ end_per_group(_Group, _Config) ->
|
|
|
ok.
|
|
|
|
|
|
init_per_suite(Config) ->
|
|
|
+ emqx_common_test_helpers:clear_screen(),
|
|
|
Config.
|
|
|
|
|
|
end_per_suite(_Config) ->
|
|
|
@@ -109,6 +113,7 @@ end_per_suite(_Config) ->
|
|
|
init_per_testcase(_Testcase, Config) ->
|
|
|
connect_and_clear_table(Config),
|
|
|
delete_bridge(Config),
|
|
|
+ snabbkaffe:start_trace(),
|
|
|
Config.
|
|
|
|
|
|
end_per_testcase(_Testcase, Config) ->
|
|
|
@@ -237,6 +242,25 @@ query_resource(Config, Request) ->
|
|
|
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
|
|
|
emqx_resource:query(ResourceID, Request, #{timeout => 500}).
|
|
|
|
|
|
+query_resource_async(Config, Request) ->
|
|
|
+ Name = ?config(mysql_name, Config),
|
|
|
+ BridgeType = ?config(mysql_bridge_type, Config),
|
|
|
+ Ref = alias([reply]),
|
|
|
+ AsyncReplyFun = fun(Result) -> Ref ! {result, Ref, Result} end,
|
|
|
+ ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
|
|
|
+ Return = emqx_resource:query(ResourceID, Request, #{
|
|
|
+ timeout => 500, async_reply_fun => {AsyncReplyFun, []}
|
|
|
+ }),
|
|
|
+ {Return, Ref}.
|
|
|
+
|
|
|
+receive_result(Ref, Timeout) ->
|
|
|
+ receive
|
|
|
+ {result, Ref, Result} ->
|
|
|
+ {ok, Result}
|
|
|
+ after Timeout ->
|
|
|
+ timeout
|
|
|
+ end.
|
|
|
+
|
|
|
unprepare(Config, Key) ->
|
|
|
Name = ?config(mysql_name, Config),
|
|
|
BridgeType = ?config(mysql_bridge_type, Config),
|
|
|
@@ -409,17 +433,29 @@ t_write_failure(Config) ->
|
|
|
Val = integer_to_binary(erlang:unique_integer()),
|
|
|
SentData = #{payload => Val, timestamp => 1668602148000},
|
|
|
?check_trace(
|
|
|
- emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
|
|
|
- case QueryMode of
|
|
|
- sync ->
|
|
|
- ?assertMatch(
|
|
|
- {error, {resource_error, #{reason := timeout}}},
|
|
|
+ begin
|
|
|
+ %% for some unknown reason, `?wait_async_action' and `subscribe'
|
|
|
+ %% hang and timeout if called inside `with_failure', but the event
|
|
|
+ %% happens and is emitted after the test pid dies!?
|
|
|
+ {ok, SRef} = snabbkaffe:subscribe(
|
|
|
+ ?match_event(#{?snk_kind := buffer_worker_flush_nack}),
|
|
|
+ 2_000
|
|
|
+ ),
|
|
|
+ emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
|
|
|
+ case QueryMode of
|
|
|
+ sync ->
|
|
|
+ ?assertMatch(
|
|
|
+ {error, {resource_error, #{reason := timeout}}},
|
|
|
+ send_message(Config, SentData)
|
|
|
+ );
|
|
|
+ async ->
|
|
|
send_message(Config, SentData)
|
|
|
- );
|
|
|
- async ->
|
|
|
- send_message(Config, SentData)
|
|
|
- end
|
|
|
- end),
|
|
|
+ end,
|
|
|
+ ?assertMatch({ok, [#{result := {error, _}}]}, snabbkaffe:receive_events(SRef)),
|
|
|
+ ok
|
|
|
+ end),
|
|
|
+ ok
|
|
|
+ end,
|
|
|
fun(Trace0) ->
|
|
|
ct:pal("trace: ~p", [Trace0]),
|
|
|
Trace = ?of_kind(buffer_worker_flush_nack, Trace0),
|
|
|
@@ -443,27 +479,52 @@ t_write_timeout(Config) ->
|
|
|
ProxyName = ?config(proxy_name, Config),
|
|
|
ProxyPort = ?config(proxy_port, Config),
|
|
|
ProxyHost = ?config(proxy_host, Config),
|
|
|
+ QueryMode = ?config(query_mode, Config),
|
|
|
{ok, _} = create_bridge(Config),
|
|
|
Val = integer_to_binary(erlang:unique_integer()),
|
|
|
SentData = #{payload => Val, timestamp => 1668602148000},
|
|
|
Timeout = 1000,
|
|
|
+ %% for some unknown reason, `?wait_async_action' and `subscribe'
|
|
|
+ %% hang and timeout if called inside `with_failure', but the event
|
|
|
+ %% happens and is emitted after the test pid dies!?
|
|
|
+ {ok, SRef} = snabbkaffe:subscribe(
|
|
|
+ ?match_event(#{?snk_kind := buffer_worker_flush_nack}),
|
|
|
+ 2 * Timeout
|
|
|
+ ),
|
|
|
emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() ->
|
|
|
- ?assertMatch(
|
|
|
- {error, {resource_error, #{reason := timeout}}},
|
|
|
- query_resource(Config, {send_message, SentData, [], Timeout})
|
|
|
- )
|
|
|
+ case QueryMode of
|
|
|
+ sync ->
|
|
|
+ ?assertMatch(
|
|
|
+ {error, {resource_error, #{reason := timeout}}},
|
|
|
+ query_resource(Config, {send_message, SentData, [], Timeout})
|
|
|
+ );
|
|
|
+ async ->
|
|
|
+ query_resource(Config, {send_message, SentData, [], Timeout}),
|
|
|
+ ok
|
|
|
+ end,
|
|
|
+ ok
|
|
|
end),
|
|
|
+ ?assertMatch({ok, [#{result := {error, _}}]}, snabbkaffe:receive_events(SRef)),
|
|
|
ok.
|
|
|
|
|
|
t_simple_sql_query(Config) ->
|
|
|
+ QueryMode = ?config(query_mode, Config),
|
|
|
+ BatchSize = ?config(batch_size, Config),
|
|
|
+ IsBatch = BatchSize > 1,
|
|
|
?assertMatch(
|
|
|
{ok, _},
|
|
|
create_bridge(Config)
|
|
|
),
|
|
|
Request = {sql, <<"SELECT count(1) AS T">>},
|
|
|
- Result = query_resource(Config, Request),
|
|
|
- BatchSize = ?config(batch_size, Config),
|
|
|
- IsBatch = BatchSize > 1,
|
|
|
+ Result =
|
|
|
+ case QueryMode of
|
|
|
+ sync ->
|
|
|
+ query_resource(Config, Request);
|
|
|
+ async ->
|
|
|
+ {_, Ref} = query_resource_async(Config, Request),
|
|
|
+ {ok, Res} = receive_result(Ref, 2_000),
|
|
|
+ Res
|
|
|
+ end,
|
|
|
case IsBatch of
|
|
|
true -> ?assertEqual({error, {unrecoverable_error, batch_select_not_implemented}}, Result);
|
|
|
false -> ?assertEqual({ok, [<<"T">>], [[1]]}, Result)
|
|
|
@@ -471,25 +532,37 @@ t_simple_sql_query(Config) ->
|
|
|
ok.
|
|
|
|
|
|
t_missing_data(Config) ->
|
|
|
+ BatchSize = ?config(batch_size, Config),
|
|
|
+ IsBatch = BatchSize > 1,
|
|
|
?assertMatch(
|
|
|
{ok, _},
|
|
|
create_bridge(Config)
|
|
|
),
|
|
|
- Result = send_message(Config, #{}),
|
|
|
- BatchSize = ?config(batch_size, Config),
|
|
|
- IsBatch = BatchSize > 1,
|
|
|
+ {ok, SRef} = snabbkaffe:subscribe(
|
|
|
+ ?match_event(#{?snk_kind := buffer_worker_flush_ack}),
|
|
|
+ 2_000
|
|
|
+ ),
|
|
|
+ send_message(Config, #{}),
|
|
|
+ {ok, [Event]} = snabbkaffe:receive_events(SRef),
|
|
|
case IsBatch of
|
|
|
true ->
|
|
|
?assertMatch(
|
|
|
- {error,
|
|
|
- {unrecoverable_error,
|
|
|
- {1292, _, <<"Truncated incorrect DOUBLE value: 'undefined'">>}}},
|
|
|
- Result
|
|
|
+ #{
|
|
|
+ result :=
|
|
|
+ {error,
|
|
|
+ {unrecoverable_error,
|
|
|
+ {1292, _, <<"Truncated incorrect DOUBLE value: 'undefined'">>}}}
|
|
|
+ },
|
|
|
+ Event
|
|
|
);
|
|
|
false ->
|
|
|
?assertMatch(
|
|
|
- {error, {unrecoverable_error, {1048, _, <<"Column 'arrived' cannot be null">>}}},
|
|
|
- Result
|
|
|
+ #{
|
|
|
+ result :=
|
|
|
+ {error,
|
|
|
+ {unrecoverable_error, {1048, _, <<"Column 'arrived' cannot be null">>}}}
|
|
|
+ },
|
|
|
+ Event
|
|
|
)
|
|
|
end,
|
|
|
ok.
|
|
|
@@ -500,14 +573,22 @@ t_bad_sql_parameter(Config) ->
|
|
|
create_bridge(Config)
|
|
|
),
|
|
|
Request = {sql, <<"">>, [bad_parameter]},
|
|
|
- Result = query_resource(Config, Request),
|
|
|
+ {_, {ok, Event}} =
|
|
|
+ ?wait_async_action(
|
|
|
+ query_resource(Config, Request),
|
|
|
+ #{?snk_kind := buffer_worker_flush_ack},
|
|
|
+ 2_000
|
|
|
+ ),
|
|
|
BatchSize = ?config(batch_size, Config),
|
|
|
IsBatch = BatchSize > 1,
|
|
|
case IsBatch of
|
|
|
true ->
|
|
|
- ?assertEqual({error, {unrecoverable_error, invalid_request}}, Result);
|
|
|
+ ?assertMatch(#{result := {error, {unrecoverable_error, invalid_request}}}, Event);
|
|
|
false ->
|
|
|
- ?assertEqual({error, {unrecoverable_error, {invalid_params, [bad_parameter]}}}, Result)
|
|
|
+ ?assertMatch(
|
|
|
+ #{result := {error, {unrecoverable_error, {invalid_params, [bad_parameter]}}}},
|
|
|
+ Event
|
|
|
+ )
|
|
|
end,
|
|
|
ok.
|
|
|
|
|
|
@@ -515,7 +596,12 @@ t_nasty_sql_string(Config) ->
|
|
|
?assertMatch({ok, _}, create_bridge(Config)),
|
|
|
Payload = list_to_binary(lists:seq(0, 255)),
|
|
|
Message = #{payload => Payload, timestamp => erlang:system_time(millisecond)},
|
|
|
- Result = send_message(Config, Message),
|
|
|
+ {Result, {ok, _}} =
|
|
|
+ ?wait_async_action(
|
|
|
+ send_message(Config, Message),
|
|
|
+ #{?snk_kind := mysql_connector_query_return},
|
|
|
+ 1_000
|
|
|
+ ),
|
|
|
?assertEqual(ok, Result),
|
|
|
?assertMatch(
|
|
|
{ok, [<<"payload">>], [[Payload]]},
|
|
|
@@ -561,12 +647,22 @@ t_unprepared_statement_query(Config) ->
|
|
|
create_bridge(Config)
|
|
|
),
|
|
|
Request = {prepared_query, unprepared_query, []},
|
|
|
- Result = query_resource(Config, Request),
|
|
|
+ {_, {ok, Event}} =
|
|
|
+ ?wait_async_action(
|
|
|
+ query_resource(Config, Request),
|
|
|
+ #{?snk_kind := buffer_worker_flush_ack},
|
|
|
+ 2_000
|
|
|
+ ),
|
|
|
BatchSize = ?config(batch_size, Config),
|
|
|
IsBatch = BatchSize > 1,
|
|
|
case IsBatch of
|
|
|
- true -> ?assertEqual({error, {unrecoverable_error, invalid_request}}, Result);
|
|
|
- false -> ?assertEqual({error, {unrecoverable_error, prepared_statement_invalid}}, Result)
|
|
|
+ true ->
|
|
|
+ ?assertMatch(#{result := {error, {unrecoverable_error, invalid_request}}}, Event);
|
|
|
+ false ->
|
|
|
+ ?assertMatch(
|
|
|
+ #{result := {error, {unrecoverable_error, prepared_statement_invalid}}},
|
|
|
+ Event
|
|
|
+ )
|
|
|
end,
|
|
|
ok.
|
|
|
|
|
|
@@ -582,7 +678,13 @@ t_uninitialized_prepared_statement(Config) ->
|
|
|
unprepare(Config, send_message),
|
|
|
?check_trace(
|
|
|
begin
|
|
|
- ?assertEqual(ok, send_message(Config, SentData)),
|
|
|
+ {Res, {ok, _}} =
|
|
|
+ ?wait_async_action(
|
|
|
+ send_message(Config, SentData),
|
|
|
+ #{?snk_kind := mysql_connector_query_return},
|
|
|
+ 2_000
|
|
|
+ ),
|
|
|
+ ?assertEqual(ok, Res),
|
|
|
ok
|
|
|
end,
|
|
|
fun(Trace) ->
|