emqx_bridge_pgsql_SUITE.erl 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%--------------------------------------------------------------------
  4. -module(emqx_bridge_pgsql_SUITE).
  5. -compile(nowarn_export_all).
  6. -compile(export_all).
  7. -include_lib("eunit/include/eunit.hrl").
  8. -include_lib("common_test/include/ct.hrl").
  9. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  10. -include("emqx_resource_errors.hrl").
  11. % SQL definitions
  12. -define(SQL_BRIDGE,
  13. "INSERT INTO mqtt_test(payload, arrived) "
  14. "VALUES (${payload}, TO_TIMESTAMP((${timestamp} :: bigint)/1000))"
  15. ).
  16. -define(SQL_CREATE_TABLE,
  17. "CREATE TABLE IF NOT EXISTS mqtt_test (payload text, arrived timestamp NOT NULL) "
  18. ).
  19. -define(SQL_DROP_TABLE, "DROP TABLE mqtt_test").
  20. -define(SQL_DELETE, "DELETE from mqtt_test").
  21. -define(SQL_SELECT, "SELECT payload FROM mqtt_test").
  22. % DB defaults
  23. -define(PGSQL_DATABASE, "mqtt").
  24. -define(PGSQL_USERNAME, "root").
  25. -define(PGSQL_PASSWORD, "public").
  26. -define(BATCH_SIZE, 10).
  27. %%------------------------------------------------------------------------------
  28. %% CT boilerplate
  29. %%------------------------------------------------------------------------------
  30. all() ->
  31. [
  32. {group, tcp},
  33. {group, tls}
  34. ].
  35. groups() ->
  36. TCs = emqx_common_test_helpers:all(?MODULE),
  37. NonBatchCases = [t_write_timeout],
  38. BatchVariantGroups = [
  39. {group, with_batch},
  40. {group, without_batch},
  41. {group, matrix},
  42. {group, timescale}
  43. ],
  44. QueryModeGroups = [{async, BatchVariantGroups}, {sync, BatchVariantGroups}],
  45. [
  46. {tcp, QueryModeGroups},
  47. {tls, QueryModeGroups},
  48. {async, BatchVariantGroups},
  49. {sync, BatchVariantGroups},
  50. {with_batch, TCs -- NonBatchCases},
  51. {without_batch, TCs},
  52. {matrix, [t_setup_via_config_and_publish, t_setup_via_http_api_and_publish]},
  53. {timescale, [t_setup_via_config_and_publish, t_setup_via_http_api_and_publish]}
  54. ].
  55. init_per_group(tcp, Config) ->
  56. Host = os:getenv("PGSQL_TCP_HOST", "toxiproxy"),
  57. Port = list_to_integer(os:getenv("PGSQL_TCP_PORT", "5432")),
  58. [
  59. {pgsql_host, Host},
  60. {pgsql_port, Port},
  61. {enable_tls, false},
  62. {proxy_name, "pgsql_tcp"}
  63. | Config
  64. ];
  65. init_per_group(tls, Config) ->
  66. Host = os:getenv("PGSQL_TLS_HOST", "toxiproxy"),
  67. Port = list_to_integer(os:getenv("PGSQL_TLS_PORT", "5433")),
  68. [
  69. {pgsql_host, Host},
  70. {pgsql_port, Port},
  71. {enable_tls, true},
  72. {proxy_name, "pgsql_tls"}
  73. | Config
  74. ];
  75. init_per_group(async, Config) ->
  76. [{query_mode, async} | Config];
  77. init_per_group(sync, Config) ->
  78. [{query_mode, sync} | Config];
  79. init_per_group(with_batch, Config0) ->
  80. Config = [{enable_batch, true} | Config0],
  81. common_init(Config);
  82. init_per_group(without_batch, Config0) ->
  83. Config = [{enable_batch, false} | Config0],
  84. common_init(Config);
  85. init_per_group(matrix, Config0) ->
  86. Config = [{bridge_type, <<"matrix">>}, {enable_batch, true} | Config0],
  87. common_init(Config);
  88. init_per_group(timescale, Config0) ->
  89. Config = [{bridge_type, <<"timescale">>}, {enable_batch, true} | Config0],
  90. common_init(Config);
  91. init_per_group(_Group, Config) ->
  92. Config.
  93. end_per_group(Group, Config) when Group =:= with_batch; Group =:= without_batch ->
  94. connect_and_drop_table(Config),
  95. ProxyHost = ?config(proxy_host, Config),
  96. ProxyPort = ?config(proxy_port, Config),
  97. emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
  98. ok;
  99. end_per_group(_Group, _Config) ->
  100. ok.
  101. init_per_suite(Config) ->
  102. Config.
  103. end_per_suite(_Config) ->
  104. emqx_mgmt_api_test_util:end_suite(),
  105. ok = emqx_common_test_helpers:stop_apps([emqx, emqx_postgresql, emqx_conf, emqx_bridge]),
  106. ok.
  107. init_per_testcase(_Testcase, Config) ->
  108. connect_and_clear_table(Config),
  109. delete_bridge(Config),
  110. snabbkaffe:start_trace(),
  111. Config.
  112. end_per_testcase(_Testcase, Config) ->
  113. ProxyHost = ?config(proxy_host, Config),
  114. ProxyPort = ?config(proxy_port, Config),
  115. emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
  116. connect_and_clear_table(Config),
  117. ok = snabbkaffe:stop(),
  118. delete_bridge(Config),
  119. ok.
  120. %%------------------------------------------------------------------------------
  121. %% Helper fns
  122. %%------------------------------------------------------------------------------
  123. common_init(Config0) ->
  124. BridgeType = proplists:get_value(bridge_type, Config0, <<"pgsql">>),
  125. Host = ?config(pgsql_host, Config0),
  126. Port = ?config(pgsql_port, Config0),
  127. case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of
  128. true ->
  129. % Setup toxiproxy
  130. ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
  131. ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
  132. emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
  133. % Ensure enterprise bridge module is loaded
  134. ok = emqx_common_test_helpers:start_apps([emqx, emqx_postgresql, emqx_conf, emqx_bridge]),
  135. _ = emqx_bridge_enterprise:module_info(),
  136. emqx_mgmt_api_test_util:init_suite(),
  137. % Connect to pgsql directly and create the table
  138. connect_and_create_table(Config0),
  139. {Name, PGConf} = pgsql_config(BridgeType, Config0),
  140. Config =
  141. [
  142. {pgsql_config, PGConf},
  143. {pgsql_bridge_type, BridgeType},
  144. {pgsql_name, Name},
  145. {proxy_host, ProxyHost},
  146. {proxy_port, ProxyPort}
  147. | Config0
  148. ],
  149. Config;
  150. false ->
  151. case os:getenv("IS_CI") of
  152. "yes" ->
  153. throw(no_pgsql);
  154. _ ->
  155. {skip, no_pgsql}
  156. end
  157. end.
  158. pgsql_config(BridgeType, Config) ->
  159. Port = integer_to_list(?config(pgsql_port, Config)),
  160. Server = ?config(pgsql_host, Config) ++ ":" ++ Port,
  161. Name = atom_to_binary(?MODULE),
  162. BatchSize =
  163. case ?config(enable_batch, Config) of
  164. true -> ?BATCH_SIZE;
  165. false -> 1
  166. end,
  167. QueryMode = ?config(query_mode, Config),
  168. TlsEnabled = ?config(enable_tls, Config),
  169. %% NOTE: supplying password through a file here, to verify that it works.
  170. Password = create_passfile(BridgeType, Config),
  171. ConfigString =
  172. io_lib:format(
  173. "bridges.~s.~s {"
  174. "\n enable = true"
  175. "\n server = ~p"
  176. "\n database = ~p"
  177. "\n username = ~p"
  178. "\n password = ~p"
  179. "\n sql = ~p"
  180. "\n resource_opts = {"
  181. "\n request_ttl = 500ms"
  182. "\n batch_size = ~b"
  183. "\n query_mode = ~s"
  184. "\n }"
  185. "\n ssl = {"
  186. "\n enable = ~w"
  187. "\n }"
  188. "\n }",
  189. [
  190. BridgeType,
  191. Name,
  192. Server,
  193. ?PGSQL_DATABASE,
  194. ?PGSQL_USERNAME,
  195. Password,
  196. ?SQL_BRIDGE,
  197. BatchSize,
  198. QueryMode,
  199. TlsEnabled
  200. ]
  201. ),
  202. {Name, parse_and_check(ConfigString, BridgeType, Name)}.
  203. create_passfile(BridgeType, Config) ->
  204. Filename = binary_to_list(BridgeType) ++ ".passfile",
  205. Filepath = filename:join(?config(priv_dir, Config), Filename),
  206. ok = file:write_file(Filepath, ?PGSQL_PASSWORD),
  207. "file://" ++ Filepath.
  208. parse_and_check(ConfigString, BridgeType, Name) ->
  209. {ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
  210. hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
  211. #{<<"bridges">> := #{BridgeType := #{Name := Config}}} = RawConf,
  212. Config.
  213. create_bridge(Config) ->
  214. create_bridge(Config, _Overrides = #{}).
  215. create_bridge(Config, Overrides) ->
  216. BridgeType = ?config(pgsql_bridge_type, Config),
  217. Name = ?config(pgsql_name, Config),
  218. PGConfig0 = ?config(pgsql_config, Config),
  219. PGConfig = emqx_utils_maps:deep_merge(PGConfig0, Overrides),
  220. emqx_bridge:create(BridgeType, Name, PGConfig).
  221. delete_bridge(Config) ->
  222. BridgeType = ?config(pgsql_bridge_type, Config),
  223. Name = ?config(pgsql_name, Config),
  224. emqx_bridge:remove(BridgeType, Name).
  225. create_bridge_http(Params) ->
  226. Path = emqx_mgmt_api_test_util:api_path(["bridges"]),
  227. AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
  228. case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of
  229. {ok, Res} -> {ok, emqx_utils_json:decode(Res, [return_maps])};
  230. Error -> Error
  231. end.
  232. send_message(Config, Payload) ->
  233. Name = ?config(pgsql_name, Config),
  234. BridgeType = ?config(pgsql_bridge_type, Config),
  235. BridgeID = emqx_bridge_resource:bridge_id(BridgeType, Name),
  236. emqx_bridge:send_message(BridgeID, Payload).
  237. query_resource(Config, Msg = _Request) ->
  238. Name = ?config(pgsql_name, Config),
  239. BridgeType = ?config(pgsql_bridge_type, Config),
  240. emqx_bridge_v2:query(BridgeType, Name, Msg, #{timeout => 1_000}).
  241. query_resource_sync(Config, Request) ->
  242. Name = ?config(pgsql_name, Config),
  243. BridgeType = ?config(pgsql_bridge_type, Config),
  244. ActionId = emqx_bridge_v2:id(BridgeType, Name),
  245. emqx_resource_buffer_worker:simple_sync_query(ActionId, Request).
  246. query_resource_async(Config, Request) ->
  247. query_resource_async(Config, Request, _Opts = #{}).
  248. query_resource_async(Config, Request, Opts) ->
  249. Name = ?config(pgsql_name, Config),
  250. BridgeType = ?config(pgsql_bridge_type, Config),
  251. Ref = alias([reply]),
  252. AsyncReplyFun = fun(Result) -> Ref ! {result, Ref, Result} end,
  253. Timeout = maps:get(timeout, Opts, 500),
  254. Return = emqx_bridge_v2:query(BridgeType, Name, Request, #{
  255. timeout => Timeout,
  256. async_reply_fun => {AsyncReplyFun, []}
  257. }),
  258. {Return, Ref}.
  259. receive_result(Ref, Timeout) ->
  260. receive
  261. {result, Ref, Result} ->
  262. {ok, Result};
  263. {Ref, Result} ->
  264. {ok, Result}
  265. after Timeout ->
  266. timeout
  267. end.
  268. connect_direct_pgsql(Config) ->
  269. Opts = #{
  270. host => ?config(pgsql_host, Config),
  271. port => ?config(pgsql_port, Config),
  272. username => ?PGSQL_USERNAME,
  273. password => ?PGSQL_PASSWORD,
  274. database => ?PGSQL_DATABASE
  275. },
  276. SslOpts =
  277. case ?config(enable_tls, Config) of
  278. true ->
  279. Opts#{
  280. ssl => true,
  281. ssl_opts => emqx_tls_lib:to_client_opts(#{enable => true})
  282. };
  283. false ->
  284. Opts
  285. end,
  286. {ok, Con} = epgsql:connect(SslOpts),
  287. Con.
  288. % These funs connect and then stop the pgsql connection
  289. connect_and_create_table(Config) ->
  290. Con = connect_direct_pgsql(Config),
  291. {ok, _, _} = epgsql:squery(Con, ?SQL_CREATE_TABLE),
  292. ok = epgsql:close(Con).
  293. connect_and_drop_table(Config) ->
  294. Con = connect_direct_pgsql(Config),
  295. {ok, _, _} = epgsql:squery(Con, ?SQL_DROP_TABLE),
  296. ok = epgsql:close(Con).
  297. connect_and_clear_table(Config) ->
  298. Con = connect_direct_pgsql(Config),
  299. _ = epgsql:squery(Con, ?SQL_CREATE_TABLE),
  300. {ok, _} = epgsql:squery(Con, ?SQL_DELETE),
  301. ok = epgsql:close(Con).
  302. connect_and_get_payload(Config) ->
  303. Con = connect_direct_pgsql(Config),
  304. {ok, _, [{Result}]} = epgsql:squery(Con, ?SQL_SELECT),
  305. ok = epgsql:close(Con),
  306. Result.
  307. %%------------------------------------------------------------------------------
  308. %% Testcases
  309. %%------------------------------------------------------------------------------
  310. t_setup_via_config_and_publish(Config) ->
  311. ?assertMatch(
  312. {ok, _},
  313. create_bridge(Config)
  314. ),
  315. Val = integer_to_binary(erlang:unique_integer()),
  316. SentData = #{payload => Val, timestamp => 1668602148000},
  317. ?check_trace(
  318. begin
  319. {_, {ok, _}} =
  320. ?wait_async_action(
  321. send_message(Config, SentData),
  322. #{?snk_kind := pgsql_connector_query_return},
  323. 10_000
  324. ),
  325. ?assertMatch(
  326. Val,
  327. connect_and_get_payload(Config)
  328. ),
  329. ok
  330. end,
  331. fun(Trace0) ->
  332. Trace = ?of_kind(pgsql_connector_query_return, Trace0),
  333. case ?config(enable_batch, Config) of
  334. true ->
  335. ?assertMatch([#{result := {_, [{ok, 1}]}}], Trace);
  336. false ->
  337. ?assertMatch([#{result := {ok, 1}}], Trace)
  338. end,
  339. ok
  340. end
  341. ),
  342. ok.
  343. t_setup_via_http_api_and_publish(Config) ->
  344. BridgeType = ?config(pgsql_bridge_type, Config),
  345. Name = ?config(pgsql_name, Config),
  346. PgsqlConfig0 = ?config(pgsql_config, Config),
  347. QueryMode = ?config(query_mode, Config),
  348. PgsqlConfig = PgsqlConfig0#{
  349. <<"name">> => Name,
  350. <<"type">> => BridgeType,
  351. %% NOTE: using literal passwords with HTTP API requests.
  352. <<"password">> => <<?PGSQL_PASSWORD>>
  353. },
  354. ?assertMatch(
  355. {ok, _},
  356. create_bridge_http(PgsqlConfig)
  357. ),
  358. Val = integer_to_binary(erlang:unique_integer()),
  359. SentData = #{payload => Val, timestamp => 1668602148000},
  360. ?check_trace(
  361. begin
  362. {Res, {ok, _}} =
  363. ?wait_async_action(
  364. send_message(Config, SentData),
  365. #{?snk_kind := pgsql_connector_query_return},
  366. 10_000
  367. ),
  368. case QueryMode of
  369. async ->
  370. ok;
  371. sync ->
  372. ?assertEqual({ok, 1}, Res)
  373. end,
  374. ?assertMatch(
  375. Val,
  376. connect_and_get_payload(Config)
  377. ),
  378. ok
  379. end,
  380. fun(Trace0) ->
  381. Trace = ?of_kind(pgsql_connector_query_return, Trace0),
  382. case ?config(enable_batch, Config) of
  383. true ->
  384. ?assertMatch([#{result := {_, [{ok, 1}]}}], Trace);
  385. false ->
  386. ?assertMatch([#{result := {ok, 1}}], Trace)
  387. end,
  388. ok
  389. end
  390. ),
  391. ok.
  392. t_get_status(Config) ->
  393. ?assertMatch(
  394. {ok, _},
  395. create_bridge(Config)
  396. ),
  397. ProxyPort = ?config(proxy_port, Config),
  398. ProxyHost = ?config(proxy_host, Config),
  399. ProxyName = ?config(proxy_name, Config),
  400. Name = ?config(pgsql_name, Config),
  401. BridgeType = ?config(pgsql_bridge_type, Config),
  402. ?assertMatch(#{status := connected}, emqx_bridge_v2:health_check(BridgeType, Name)),
  403. emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
  404. ?assertMatch(
  405. #{status := Status} when Status =:= disconnected orelse Status =:= connecting,
  406. emqx_bridge_v2:health_check(BridgeType, Name)
  407. )
  408. end),
  409. ok.
  410. t_create_disconnected(Config) ->
  411. ProxyPort = ?config(proxy_port, Config),
  412. ProxyHost = ?config(proxy_host, Config),
  413. ProxyName = ?config(proxy_name, Config),
  414. ?check_trace(
  415. emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
  416. ?assertMatch({ok, _}, create_bridge(Config))
  417. end),
  418. fun(Trace) ->
  419. ?assertMatch(
  420. [#{error := {start_pool_failed, _, _}}],
  421. ?of_kind(pgsql_connector_start_failed, Trace)
  422. ),
  423. ok
  424. end
  425. ),
  426. ok.
  427. t_write_failure(Config) ->
  428. ProxyName = ?config(proxy_name, Config),
  429. ProxyPort = ?config(proxy_port, Config),
  430. ProxyHost = ?config(proxy_host, Config),
  431. QueryMode = ?config(query_mode, Config),
  432. {ok, _} = create_bridge(Config),
  433. Val = integer_to_binary(erlang:unique_integer()),
  434. SentData = #{payload => Val, timestamp => 1668602148000},
  435. ?check_trace(
  436. emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
  437. {_, {ok, _}} =
  438. ?wait_async_action(
  439. case QueryMode of
  440. sync ->
  441. ?assertMatch({error, _}, send_message(Config, SentData));
  442. async ->
  443. send_message(Config, SentData)
  444. end,
  445. #{?snk_kind := buffer_worker_flush_nack},
  446. 15_000
  447. )
  448. end),
  449. fun(Trace0) ->
  450. ct:pal("trace: ~p", [Trace0]),
  451. Trace = ?of_kind(buffer_worker_flush_nack, Trace0),
  452. ?assertMatch([#{result := {error, _}} | _], Trace),
  453. [#{result := {error, Error}} | _] = Trace,
  454. case Error of
  455. {resource_error, _} ->
  456. ok;
  457. {recoverable_error, disconnected} ->
  458. ok;
  459. _ ->
  460. ct:fail("unexpected error: ~p", [Error])
  461. end
  462. end
  463. ),
  464. ok.
  465. % This test doesn't work with batch enabled since it is not possible
  466. % to set the timeout directly for batch queries
  467. t_write_timeout(Config) ->
  468. ProxyName = ?config(proxy_name, Config),
  469. ProxyPort = ?config(proxy_port, Config),
  470. ProxyHost = ?config(proxy_host, Config),
  471. QueryMode = ?config(query_mode, Config),
  472. {ok, _} = create_bridge(
  473. Config,
  474. #{
  475. <<"resource_opts">> => #{
  476. <<"resume_interval">> => <<"100ms">>,
  477. <<"health_check_interval">> => <<"100ms">>
  478. }
  479. }
  480. ),
  481. Val = integer_to_binary(erlang:unique_integer()),
  482. SentData = #{payload => Val, timestamp => 1668602148000},
  483. {ok, SRef} = snabbkaffe:subscribe(
  484. ?match_event(#{?snk_kind := call_query_enter}),
  485. 2_000
  486. ),
  487. Res0 =
  488. emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() ->
  489. Res1 =
  490. case QueryMode of
  491. async ->
  492. query_resource_async(Config, {send_message, SentData}, #{timeout => 60_000});
  493. sync ->
  494. query_resource(Config, {send_message, SentData})
  495. end,
  496. ?assertMatch({ok, [_]}, snabbkaffe:receive_events(SRef)),
  497. Res1
  498. end),
  499. case Res0 of
  500. {_, Ref} when is_reference(Ref) ->
  501. case receive_result(Ref, 15_000) of
  502. {ok, Res} ->
  503. %% we may receive a successful result depending on
  504. %% timing, if the request is retried after the
  505. %% failure is healed.
  506. case Res of
  507. {error, {unrecoverable_error, _}} ->
  508. ok;
  509. {ok, _} ->
  510. ok;
  511. _ ->
  512. ct:fail("unexpected result: ~p", [Res])
  513. end;
  514. timeout ->
  515. ct:pal("mailbox:\n ~p", [process_info(self(), messages)]),
  516. ct:fail("no response received")
  517. end;
  518. _ ->
  519. ?assertMatch({error, {resource_error, #{reason := timeout}}}, Res0)
  520. end,
  521. ok.
  522. t_simple_sql_query(Config) ->
  523. EnableBatch = ?config(enable_batch, Config),
  524. QueryMode = ?config(query_mode, Config),
  525. ?assertMatch(
  526. {ok, _},
  527. create_bridge(Config)
  528. ),
  529. Request = {sql, <<"SELECT count(1) AS T">>},
  530. Result =
  531. case QueryMode of
  532. sync ->
  533. query_resource(Config, Request);
  534. async ->
  535. {_, Ref} = query_resource_async(Config, Request),
  536. {ok, Res} = receive_result(Ref, 2_000),
  537. Res
  538. end,
  539. case EnableBatch of
  540. true ->
  541. ?assertEqual({error, {unrecoverable_error, batch_prepare_not_implemented}}, Result);
  542. false ->
  543. ?assertMatch({ok, _, [{1}]}, Result)
  544. end,
  545. ok.
  546. t_missing_data(Config) ->
  547. ?assertMatch(
  548. {ok, _},
  549. create_bridge(Config)
  550. ),
  551. {_, {ok, Event}} =
  552. ?wait_async_action(
  553. send_message(Config, #{}),
  554. #{?snk_kind := buffer_worker_flush_ack},
  555. 2_000
  556. ),
  557. ?assertMatch(
  558. #{
  559. result :=
  560. {error,
  561. {unrecoverable_error, {error, error, <<"23502">>, not_null_violation, _, _}}}
  562. },
  563. Event
  564. ),
  565. ok.
  566. t_bad_sql_parameter(Config) ->
  567. QueryMode = ?config(query_mode, Config),
  568. EnableBatch = ?config(enable_batch, Config),
  569. ?assertMatch(
  570. {ok, _},
  571. create_bridge(Config)
  572. ),
  573. Request = {sql, <<"">>, [bad_parameter]},
  574. Result =
  575. case QueryMode of
  576. sync ->
  577. query_resource(Config, Request);
  578. async ->
  579. {_, Ref} = query_resource_async(Config, Request),
  580. {ok, Res} = receive_result(Ref, 2_000),
  581. Res
  582. end,
  583. case EnableBatch of
  584. true ->
  585. ?assertEqual({error, {unrecoverable_error, invalid_request}}, Result);
  586. false ->
  587. ?assertMatch(
  588. {error, {unrecoverable_error, _}}, Result
  589. )
  590. end,
  591. ok.
  592. t_nasty_sql_string(Config) ->
  593. ?assertMatch({ok, _}, create_bridge(Config)),
  594. Payload = list_to_binary(lists:seq(1, 127)),
  595. Message = #{payload => Payload, timestamp => erlang:system_time(millisecond)},
  596. {_, {ok, _}} =
  597. ?wait_async_action(
  598. send_message(Config, Message),
  599. #{?snk_kind := pgsql_connector_query_return},
  600. 1_000
  601. ),
  602. ?assertEqual(Payload, connect_and_get_payload(Config)).
  603. t_missing_table(Config) ->
  604. Name = ?config(pgsql_name, Config),
  605. BridgeType = ?config(pgsql_bridge_type, Config),
  606. % ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
  607. ?check_trace(
  608. begin
  609. connect_and_drop_table(Config),
  610. ?assertMatch({ok, _}, create_bridge(Config)),
  611. ?retry(
  612. _Sleep = 1_000,
  613. _Attempts = 20,
  614. ?assertMatch(
  615. #{status := Status} when Status == connecting orelse Status == disconnected,
  616. emqx_bridge_v2:health_check(BridgeType, Name)
  617. )
  618. ),
  619. Val = integer_to_binary(erlang:unique_integer()),
  620. SentData = #{payload => Val, timestamp => 1668602148000},
  621. ?assertMatch(
  622. {error, {resource_error, #{reason := unhealthy_target}}},
  623. query_resource(Config, {send_message, SentData})
  624. ),
  625. ok
  626. end,
  627. fun(Trace) ->
  628. ?assertMatch([_ | _], ?of_kind(pgsql_undefined_table, Trace)),
  629. ok
  630. end
  631. ),
  632. connect_and_create_table(Config),
  633. ok.
  634. t_table_removed(Config) ->
  635. Name = ?config(pgsql_name, Config),
  636. BridgeType = ?config(pgsql_bridge_type, Config),
  637. %%ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
  638. ?check_trace(
  639. begin
  640. connect_and_create_table(Config),
  641. ?assertMatch({ok, _}, create_bridge(Config)),
  642. ?retry(
  643. _Sleep = 1_000,
  644. _Attempts = 20,
  645. ?assertMatch(#{status := connected}, emqx_bridge_v2:health_check(BridgeType, Name))
  646. ),
  647. connect_and_drop_table(Config),
  648. Val = integer_to_binary(erlang:unique_integer()),
  649. SentData = #{payload => Val, timestamp => 1668602148000},
  650. ActionId = emqx_bridge_v2:id(BridgeType, Name),
  651. case query_resource_sync(Config, {ActionId, SentData}) of
  652. {error, {unrecoverable_error, _}} ->
  653. ok;
  654. ?RESOURCE_ERROR_M(not_connected, _) ->
  655. ok;
  656. Res ->
  657. ct:fail("unexpected result: ~p", [Res])
  658. end,
  659. ok
  660. end,
  661. []
  662. ),
  663. connect_and_create_table(Config),
  664. ok.
  665. t_concurrent_health_checks(Config) ->
  666. Name = ?config(pgsql_name, Config),
  667. BridgeType = ?config(pgsql_bridge_type, Config),
  668. ?check_trace(
  669. begin
  670. connect_and_create_table(Config),
  671. ?assertMatch({ok, _}, create_bridge(Config)),
  672. ?retry(
  673. _Sleep = 1_000,
  674. _Attempts = 20,
  675. ?assertMatch(#{status := connected}, emqx_bridge_v2:health_check(BridgeType, Name))
  676. ),
  677. emqx_utils:pmap(
  678. fun(_) ->
  679. ?assertMatch(
  680. #{status := connected}, emqx_bridge_v2:health_check(BridgeType, Name)
  681. )
  682. end,
  683. lists:seq(1, 20)
  684. ),
  685. ok
  686. end,
  687. fun(Trace) ->
  688. ?assertEqual([], ?of_kind(postgres_connector_bad_parse2, Trace)),
  689. ok
  690. end
  691. ),
  692. ok.