emqx_bridge_pgsql_SUITE.erl 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%--------------------------------------------------------------------
  4. -module(emqx_bridge_pgsql_SUITE).
  5. -compile(nowarn_export_all).
  6. -compile(export_all).
  7. -include_lib("eunit/include/eunit.hrl").
  8. -include_lib("common_test/include/ct.hrl").
  9. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  10. % SQL definitions
  11. -define(SQL_BRIDGE,
  12. "INSERT INTO mqtt_test(payload, arrived) "
  13. "VALUES (${payload}, TO_TIMESTAMP((${timestamp} :: bigint)/1000))"
  14. ).
  15. -define(SQL_CREATE_TABLE,
  16. "CREATE TABLE IF NOT EXISTS mqtt_test (payload text, arrived timestamp NOT NULL) "
  17. ).
  18. -define(SQL_DROP_TABLE, "DROP TABLE mqtt_test").
  19. -define(SQL_DELETE, "DELETE from mqtt_test").
  20. -define(SQL_SELECT, "SELECT payload FROM mqtt_test").
  21. % DB defaults
  22. -define(PGSQL_DATABASE, "mqtt").
  23. -define(PGSQL_USERNAME, "root").
  24. -define(PGSQL_PASSWORD, "public").
  25. -define(BATCH_SIZE, 10).
  26. %%------------------------------------------------------------------------------
  27. %% CT boilerplate
  28. %%------------------------------------------------------------------------------
  29. all() ->
  30. [
  31. {group, tcp},
  32. {group, tls}
  33. ].
  34. groups() ->
  35. TCs = emqx_common_test_helpers:all(?MODULE),
  36. NonBatchCases = [t_write_timeout],
  37. BatchVariantGroups = [
  38. {group, with_batch},
  39. {group, without_batch},
  40. {group, matrix},
  41. {group, timescale}
  42. ],
  43. QueryModeGroups = [{async, BatchVariantGroups}, {sync, BatchVariantGroups}],
  44. [
  45. {tcp, QueryModeGroups},
  46. {tls, QueryModeGroups},
  47. {async, BatchVariantGroups},
  48. {sync, BatchVariantGroups},
  49. {with_batch, TCs -- NonBatchCases},
  50. {without_batch, TCs},
  51. {matrix, [t_setup_via_config_and_publish, t_setup_via_http_api_and_publish]},
  52. {timescale, [t_setup_via_config_and_publish, t_setup_via_http_api_and_publish]}
  53. ].
  54. init_per_group(tcp, Config) ->
  55. Host = os:getenv("PGSQL_TCP_HOST", "toxiproxy"),
  56. Port = list_to_integer(os:getenv("PGSQL_TCP_PORT", "5432")),
  57. [
  58. {pgsql_host, Host},
  59. {pgsql_port, Port},
  60. {enable_tls, false},
  61. {proxy_name, "pgsql_tcp"}
  62. | Config
  63. ];
  64. init_per_group(tls, Config) ->
  65. Host = os:getenv("PGSQL_TLS_HOST", "toxiproxy"),
  66. Port = list_to_integer(os:getenv("PGSQL_TLS_PORT", "5433")),
  67. [
  68. {pgsql_host, Host},
  69. {pgsql_port, Port},
  70. {enable_tls, true},
  71. {proxy_name, "pgsql_tls"}
  72. | Config
  73. ];
  74. init_per_group(async, Config) ->
  75. [{query_mode, async} | Config];
  76. init_per_group(sync, Config) ->
  77. [{query_mode, sync} | Config];
  78. init_per_group(with_batch, Config0) ->
  79. Config = [{enable_batch, true} | Config0],
  80. common_init(Config);
  81. init_per_group(without_batch, Config0) ->
  82. Config = [{enable_batch, false} | Config0],
  83. common_init(Config);
  84. init_per_group(matrix, Config0) ->
  85. Config = [{bridge_type, <<"matrix">>}, {enable_batch, true} | Config0],
  86. common_init(Config);
  87. init_per_group(timescale, Config0) ->
  88. Config = [{bridge_type, <<"timescale">>}, {enable_batch, true} | Config0],
  89. common_init(Config);
  90. init_per_group(_Group, Config) ->
  91. Config.
  92. end_per_group(Group, Config) when Group =:= with_batch; Group =:= without_batch ->
  93. connect_and_drop_table(Config),
  94. ProxyHost = ?config(proxy_host, Config),
  95. ProxyPort = ?config(proxy_port, Config),
  96. emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
  97. ok;
  98. end_per_group(_Group, _Config) ->
  99. ok.
  100. init_per_suite(Config) ->
  101. Config.
  102. end_per_suite(_Config) ->
  103. emqx_mgmt_api_test_util:end_suite(),
  104. ok = emqx_common_test_helpers:stop_apps([emqx_bridge, emqx_conf]),
  105. ok.
  106. init_per_testcase(_Testcase, Config) ->
  107. connect_and_clear_table(Config),
  108. delete_bridge(Config),
  109. snabbkaffe:start_trace(),
  110. Config.
  111. end_per_testcase(_Testcase, Config) ->
  112. ProxyHost = ?config(proxy_host, Config),
  113. ProxyPort = ?config(proxy_port, Config),
  114. emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
  115. connect_and_clear_table(Config),
  116. ok = snabbkaffe:stop(),
  117. delete_bridge(Config),
  118. ok.
  119. %%------------------------------------------------------------------------------
  120. %% Helper fns
  121. %%------------------------------------------------------------------------------
  122. common_init(Config0) ->
  123. BridgeType = proplists:get_value(bridge_type, Config0, <<"pgsql">>),
  124. Host = ?config(pgsql_host, Config0),
  125. Port = ?config(pgsql_port, Config0),
  126. case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of
  127. true ->
  128. % Setup toxiproxy
  129. ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
  130. ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
  131. emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
  132. % Ensure EE bridge module is loaded
  133. _ = application:load(emqx_ee_bridge),
  134. _ = emqx_ee_bridge:module_info(),
  135. ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]),
  136. emqx_mgmt_api_test_util:init_suite(),
  137. % Connect to pgsql directly and create the table
  138. connect_and_create_table(Config0),
  139. {Name, PGConf} = pgsql_config(BridgeType, Config0),
  140. Config =
  141. [
  142. {pgsql_config, PGConf},
  143. {pgsql_bridge_type, BridgeType},
  144. {pgsql_name, Name},
  145. {proxy_host, ProxyHost},
  146. {proxy_port, ProxyPort}
  147. | Config0
  148. ],
  149. Config;
  150. false ->
  151. case os:getenv("IS_CI") of
  152. "yes" ->
  153. throw(no_pgsql);
  154. _ ->
  155. {skip, no_pgsql}
  156. end
  157. end.
  158. pgsql_config(BridgeType, Config) ->
  159. Port = integer_to_list(?config(pgsql_port, Config)),
  160. Server = ?config(pgsql_host, Config) ++ ":" ++ Port,
  161. Name = atom_to_binary(?MODULE),
  162. BatchSize =
  163. case ?config(enable_batch, Config) of
  164. true -> ?BATCH_SIZE;
  165. false -> 1
  166. end,
  167. QueryMode = ?config(query_mode, Config),
  168. TlsEnabled = ?config(enable_tls, Config),
  169. ConfigString =
  170. io_lib:format(
  171. "bridges.~s.~s {\n"
  172. " enable = true\n"
  173. " server = ~p\n"
  174. " database = ~p\n"
  175. " username = ~p\n"
  176. " password = ~p\n"
  177. " sql = ~p\n"
  178. " resource_opts = {\n"
  179. " request_ttl = 500ms\n"
  180. " batch_size = ~b\n"
  181. " query_mode = ~s\n"
  182. " }\n"
  183. " ssl = {\n"
  184. " enable = ~w\n"
  185. " }\n"
  186. "}",
  187. [
  188. BridgeType,
  189. Name,
  190. Server,
  191. ?PGSQL_DATABASE,
  192. ?PGSQL_USERNAME,
  193. ?PGSQL_PASSWORD,
  194. ?SQL_BRIDGE,
  195. BatchSize,
  196. QueryMode,
  197. TlsEnabled
  198. ]
  199. ),
  200. {Name, parse_and_check(ConfigString, BridgeType, Name)}.
  201. parse_and_check(ConfigString, BridgeType, Name) ->
  202. {ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
  203. hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
  204. #{<<"bridges">> := #{BridgeType := #{Name := Config}}} = RawConf,
  205. Config.
  206. create_bridge(Config) ->
  207. create_bridge(Config, _Overrides = #{}).
  208. create_bridge(Config, Overrides) ->
  209. BridgeType = ?config(pgsql_bridge_type, Config),
  210. Name = ?config(pgsql_name, Config),
  211. PGConfig0 = ?config(pgsql_config, Config),
  212. PGConfig = emqx_utils_maps:deep_merge(PGConfig0, Overrides),
  213. emqx_bridge:create(BridgeType, Name, PGConfig).
  214. delete_bridge(Config) ->
  215. BridgeType = ?config(pgsql_bridge_type, Config),
  216. Name = ?config(pgsql_name, Config),
  217. emqx_bridge:remove(BridgeType, Name).
  218. create_bridge_http(Params) ->
  219. Path = emqx_mgmt_api_test_util:api_path(["bridges"]),
  220. AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
  221. case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of
  222. {ok, Res} -> {ok, emqx_utils_json:decode(Res, [return_maps])};
  223. Error -> Error
  224. end.
  225. send_message(Config, Payload) ->
  226. Name = ?config(pgsql_name, Config),
  227. BridgeType = ?config(pgsql_bridge_type, Config),
  228. BridgeID = emqx_bridge_resource:bridge_id(BridgeType, Name),
  229. emqx_bridge:send_message(BridgeID, Payload).
  230. query_resource(Config, Request) ->
  231. Name = ?config(pgsql_name, Config),
  232. BridgeType = ?config(pgsql_bridge_type, Config),
  233. ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
  234. emqx_resource:query(ResourceID, Request, #{timeout => 1_000}).
  235. query_resource_async(Config, Request) ->
  236. query_resource_async(Config, Request, _Opts = #{}).
  237. query_resource_async(Config, Request, Opts) ->
  238. Name = ?config(pgsql_name, Config),
  239. BridgeType = ?config(pgsql_bridge_type, Config),
  240. Ref = alias([reply]),
  241. AsyncReplyFun = fun(Result) -> Ref ! {result, Ref, Result} end,
  242. ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
  243. Timeout = maps:get(timeout, Opts, 500),
  244. Return = emqx_resource:query(ResourceID, Request, #{
  245. timeout => Timeout,
  246. async_reply_fun => {AsyncReplyFun, []}
  247. }),
  248. {Return, Ref}.
  249. receive_result(Ref, Timeout) ->
  250. receive
  251. {result, Ref, Result} ->
  252. {ok, Result};
  253. {Ref, Result} ->
  254. {ok, Result}
  255. after Timeout ->
  256. timeout
  257. end.
  258. connect_direct_pgsql(Config) ->
  259. Opts = #{
  260. host => ?config(pgsql_host, Config),
  261. port => ?config(pgsql_port, Config),
  262. username => ?PGSQL_USERNAME,
  263. password => ?PGSQL_PASSWORD,
  264. database => ?PGSQL_DATABASE
  265. },
  266. SslOpts =
  267. case ?config(enable_tls, Config) of
  268. true ->
  269. Opts#{
  270. ssl => true,
  271. ssl_opts => emqx_tls_lib:to_client_opts(#{enable => true})
  272. };
  273. false ->
  274. Opts
  275. end,
  276. {ok, Con} = epgsql:connect(SslOpts),
  277. Con.
  278. % These funs connect and then stop the pgsql connection
  279. connect_and_create_table(Config) ->
  280. Con = connect_direct_pgsql(Config),
  281. {ok, _, _} = epgsql:squery(Con, ?SQL_CREATE_TABLE),
  282. ok = epgsql:close(Con).
  283. connect_and_drop_table(Config) ->
  284. Con = connect_direct_pgsql(Config),
  285. {ok, _, _} = epgsql:squery(Con, ?SQL_DROP_TABLE),
  286. ok = epgsql:close(Con).
  287. connect_and_clear_table(Config) ->
  288. Con = connect_direct_pgsql(Config),
  289. {ok, _} = epgsql:squery(Con, ?SQL_DELETE),
  290. ok = epgsql:close(Con).
  291. connect_and_get_payload(Config) ->
  292. Con = connect_direct_pgsql(Config),
  293. {ok, _, [{Result}]} = epgsql:squery(Con, ?SQL_SELECT),
  294. ok = epgsql:close(Con),
  295. Result.
  296. %%------------------------------------------------------------------------------
  297. %% Testcases
  298. %%------------------------------------------------------------------------------
  299. t_setup_via_config_and_publish(Config) ->
  300. ?assertMatch(
  301. {ok, _},
  302. create_bridge(Config)
  303. ),
  304. Val = integer_to_binary(erlang:unique_integer()),
  305. SentData = #{payload => Val, timestamp => 1668602148000},
  306. ?check_trace(
  307. begin
  308. {_, {ok, _}} =
  309. ?wait_async_action(
  310. send_message(Config, SentData),
  311. #{?snk_kind := pgsql_connector_query_return},
  312. 10_000
  313. ),
  314. ?assertMatch(
  315. Val,
  316. connect_and_get_payload(Config)
  317. ),
  318. ok
  319. end,
  320. fun(Trace0) ->
  321. Trace = ?of_kind(pgsql_connector_query_return, Trace0),
  322. case ?config(enable_batch, Config) of
  323. true ->
  324. ?assertMatch([#{result := {_, [{ok, 1}]}}], Trace);
  325. false ->
  326. ?assertMatch([#{result := {ok, 1}}], Trace)
  327. end,
  328. ok
  329. end
  330. ),
  331. ok.
  332. t_setup_via_http_api_and_publish(Config) ->
  333. BridgeType = ?config(pgsql_bridge_type, Config),
  334. Name = ?config(pgsql_name, Config),
  335. PgsqlConfig0 = ?config(pgsql_config, Config),
  336. QueryMode = ?config(query_mode, Config),
  337. PgsqlConfig = PgsqlConfig0#{
  338. <<"name">> => Name,
  339. <<"type">> => BridgeType
  340. },
  341. ?assertMatch(
  342. {ok, _},
  343. create_bridge_http(PgsqlConfig)
  344. ),
  345. Val = integer_to_binary(erlang:unique_integer()),
  346. SentData = #{payload => Val, timestamp => 1668602148000},
  347. ?check_trace(
  348. begin
  349. {Res, {ok, _}} =
  350. ?wait_async_action(
  351. send_message(Config, SentData),
  352. #{?snk_kind := pgsql_connector_query_return},
  353. 10_000
  354. ),
  355. case QueryMode of
  356. async ->
  357. ok;
  358. sync ->
  359. ?assertEqual({ok, 1}, Res)
  360. end,
  361. ?assertMatch(
  362. Val,
  363. connect_and_get_payload(Config)
  364. ),
  365. ok
  366. end,
  367. fun(Trace0) ->
  368. Trace = ?of_kind(pgsql_connector_query_return, Trace0),
  369. case ?config(enable_batch, Config) of
  370. true ->
  371. ?assertMatch([#{result := {_, [{ok, 1}]}}], Trace);
  372. false ->
  373. ?assertMatch([#{result := {ok, 1}}], Trace)
  374. end,
  375. ok
  376. end
  377. ),
  378. ok.
  379. t_get_status(Config) ->
  380. ?assertMatch(
  381. {ok, _},
  382. create_bridge(Config)
  383. ),
  384. ProxyPort = ?config(proxy_port, Config),
  385. ProxyHost = ?config(proxy_host, Config),
  386. ProxyName = ?config(proxy_name, Config),
  387. Name = ?config(pgsql_name, Config),
  388. BridgeType = ?config(pgsql_bridge_type, Config),
  389. ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
  390. ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID)),
  391. emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
  392. ?assertMatch(
  393. {ok, Status} when Status =:= disconnected orelse Status =:= connecting,
  394. emqx_resource_manager:health_check(ResourceID)
  395. )
  396. end),
  397. ok.
  398. t_create_disconnected(Config) ->
  399. ProxyPort = ?config(proxy_port, Config),
  400. ProxyHost = ?config(proxy_host, Config),
  401. ProxyName = ?config(proxy_name, Config),
  402. ?check_trace(
  403. emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
  404. ?assertMatch({ok, _}, create_bridge(Config))
  405. end),
  406. fun(Trace) ->
  407. ?assertMatch(
  408. [#{error := {start_pool_failed, _, _}}],
  409. ?of_kind(pgsql_connector_start_failed, Trace)
  410. ),
  411. ok
  412. end
  413. ),
  414. ok.
  415. t_write_failure(Config) ->
  416. ProxyName = ?config(proxy_name, Config),
  417. ProxyPort = ?config(proxy_port, Config),
  418. ProxyHost = ?config(proxy_host, Config),
  419. QueryMode = ?config(query_mode, Config),
  420. {ok, _} = create_bridge(Config),
  421. Val = integer_to_binary(erlang:unique_integer()),
  422. SentData = #{payload => Val, timestamp => 1668602148000},
  423. ?check_trace(
  424. emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
  425. {_, {ok, _}} =
  426. ?wait_async_action(
  427. case QueryMode of
  428. sync ->
  429. ?assertMatch({error, _}, send_message(Config, SentData));
  430. async ->
  431. send_message(Config, SentData)
  432. end,
  433. #{?snk_kind := buffer_worker_flush_nack},
  434. 1_000
  435. )
  436. end),
  437. fun(Trace0) ->
  438. ct:pal("trace: ~p", [Trace0]),
  439. Trace = ?of_kind(buffer_worker_flush_nack, Trace0),
  440. ?assertMatch([#{result := {error, _}} | _], Trace),
  441. [#{result := {error, Error}} | _] = Trace,
  442. case Error of
  443. {resource_error, _} ->
  444. ok;
  445. {recoverable_error, disconnected} ->
  446. ok;
  447. _ ->
  448. ct:fail("unexpected error: ~p", [Error])
  449. end
  450. end
  451. ),
  452. ok.
  453. % This test doesn't work with batch enabled since it is not possible
  454. % to set the timeout directly for batch queries
  455. t_write_timeout(Config) ->
  456. ProxyName = ?config(proxy_name, Config),
  457. ProxyPort = ?config(proxy_port, Config),
  458. ProxyHost = ?config(proxy_host, Config),
  459. QueryMode = ?config(query_mode, Config),
  460. {ok, _} = create_bridge(
  461. Config,
  462. #{
  463. <<"resource_opts">> => #{
  464. <<"resume_interval">> => <<"100ms">>,
  465. <<"health_check_interval">> => <<"100ms">>
  466. }
  467. }
  468. ),
  469. Val = integer_to_binary(erlang:unique_integer()),
  470. SentData = #{payload => Val, timestamp => 1668602148000},
  471. {ok, SRef} = snabbkaffe:subscribe(
  472. ?match_event(#{?snk_kind := call_query_enter}),
  473. 2_000
  474. ),
  475. Res0 =
  476. emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() ->
  477. Res1 =
  478. case QueryMode of
  479. async ->
  480. query_resource_async(Config, {send_message, SentData}, #{timeout => 60_000});
  481. sync ->
  482. query_resource(Config, {send_message, SentData})
  483. end,
  484. ?assertMatch({ok, [_]}, snabbkaffe:receive_events(SRef)),
  485. Res1
  486. end),
  487. case Res0 of
  488. {_, Ref} when is_reference(Ref) ->
  489. case receive_result(Ref, 15_000) of
  490. {ok, Res} ->
  491. %% we may receive a successful result depending on
  492. %% timing, if the request is retried after the
  493. %% failure is healed.
  494. case Res of
  495. {error, {unrecoverable_error, _}} ->
  496. ok;
  497. {ok, _} ->
  498. ok;
  499. _ ->
  500. ct:fail("unexpected result: ~p", [Res])
  501. end;
  502. timeout ->
  503. ct:pal("mailbox:\n ~p", [process_info(self(), messages)]),
  504. ct:fail("no response received")
  505. end;
  506. _ ->
  507. ?assertMatch({error, {resource_error, #{reason := timeout}}}, Res0)
  508. end,
  509. ok.
  510. t_simple_sql_query(Config) ->
  511. EnableBatch = ?config(enable_batch, Config),
  512. QueryMode = ?config(query_mode, Config),
  513. ?assertMatch(
  514. {ok, _},
  515. create_bridge(Config)
  516. ),
  517. Request = {sql, <<"SELECT count(1) AS T">>},
  518. Result =
  519. case QueryMode of
  520. sync ->
  521. query_resource(Config, Request);
  522. async ->
  523. {_, Ref} = query_resource_async(Config, Request),
  524. {ok, Res} = receive_result(Ref, 2_000),
  525. Res
  526. end,
  527. case EnableBatch of
  528. true ->
  529. ?assertEqual({error, {unrecoverable_error, batch_prepare_not_implemented}}, Result);
  530. false ->
  531. ?assertMatch({ok, _, [{1}]}, Result)
  532. end,
  533. ok.
  534. t_missing_data(Config) ->
  535. ?assertMatch(
  536. {ok, _},
  537. create_bridge(Config)
  538. ),
  539. {_, {ok, Event}} =
  540. ?wait_async_action(
  541. send_message(Config, #{}),
  542. #{?snk_kind := buffer_worker_flush_ack},
  543. 2_000
  544. ),
  545. ?assertMatch(
  546. #{
  547. result :=
  548. {error,
  549. {unrecoverable_error, {error, error, <<"23502">>, not_null_violation, _, _}}}
  550. },
  551. Event
  552. ),
  553. ok.
  554. t_bad_sql_parameter(Config) ->
  555. QueryMode = ?config(query_mode, Config),
  556. EnableBatch = ?config(enable_batch, Config),
  557. ?assertMatch(
  558. {ok, _},
  559. create_bridge(Config)
  560. ),
  561. Request = {sql, <<"">>, [bad_parameter]},
  562. Result =
  563. case QueryMode of
  564. sync ->
  565. query_resource(Config, Request);
  566. async ->
  567. {_, Ref} = query_resource_async(Config, Request),
  568. {ok, Res} = receive_result(Ref, 2_000),
  569. Res
  570. end,
  571. case EnableBatch of
  572. true ->
  573. ?assertEqual({error, {unrecoverable_error, invalid_request}}, Result);
  574. false ->
  575. ?assertMatch(
  576. {error, {unrecoverable_error, _}}, Result
  577. )
  578. end,
  579. ok.
  580. t_nasty_sql_string(Config) ->
  581. ?assertMatch({ok, _}, create_bridge(Config)),
  582. Payload = list_to_binary(lists:seq(1, 127)),
  583. Message = #{payload => Payload, timestamp => erlang:system_time(millisecond)},
  584. {_, {ok, _}} =
  585. ?wait_async_action(
  586. send_message(Config, Message),
  587. #{?snk_kind := pgsql_connector_query_return},
  588. 1_000
  589. ),
  590. ?assertEqual(Payload, connect_and_get_payload(Config)).