emqx_bridge_pgsql_SUITE.erl 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%--------------------------------------------------------------------
  4. -module(emqx_bridge_pgsql_SUITE).
  5. -compile(nowarn_export_all).
  6. -compile(export_all).
  7. -include_lib("eunit/include/eunit.hrl").
  8. -include_lib("common_test/include/ct.hrl").
  9. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  10. -include("emqx_resource_errors.hrl").
  11. % SQL definitions
  12. -define(SQL_BRIDGE,
  13. "INSERT INTO mqtt_test(payload, arrived) "
  14. "VALUES (${payload}, TO_TIMESTAMP((${timestamp} :: bigint)/1000))"
  15. ).
  16. -define(SQL_CREATE_TABLE,
  17. "CREATE TABLE IF NOT EXISTS mqtt_test (payload text, arrived timestamp NOT NULL) "
  18. ).
  19. -define(SQL_DROP_TABLE, "DROP TABLE mqtt_test").
  20. -define(SQL_DELETE, "DELETE from mqtt_test").
  21. -define(SQL_SELECT, "SELECT payload FROM mqtt_test").
  22. % DB defaults
  23. -define(PGSQL_DATABASE, "mqtt").
  24. -define(PGSQL_USERNAME, "root").
  25. -define(PGSQL_PASSWORD, "public").
  26. -define(BATCH_SIZE, 10).
  27. %%------------------------------------------------------------------------------
  28. %% CT boilerplate
  29. %%------------------------------------------------------------------------------
  30. all() ->
  31. [
  32. {group, tcp},
  33. {group, tls}
  34. ].
  35. groups() ->
  36. TCs = emqx_common_test_helpers:all(?MODULE),
  37. NonBatchCases = [t_write_timeout],
  38. BatchVariantGroups = [
  39. {group, with_batch},
  40. {group, without_batch},
  41. {group, matrix},
  42. {group, timescale}
  43. ],
  44. QueryModeGroups = [{async, BatchVariantGroups}, {sync, BatchVariantGroups}],
  45. [
  46. {tcp, QueryModeGroups},
  47. {tls, QueryModeGroups},
  48. {async, BatchVariantGroups},
  49. {sync, BatchVariantGroups},
  50. {with_batch, TCs -- NonBatchCases},
  51. {without_batch, TCs},
  52. {matrix, [t_setup_via_config_and_publish, t_setup_via_http_api_and_publish]},
  53. {timescale, [t_setup_via_config_and_publish, t_setup_via_http_api_and_publish]}
  54. ].
  55. init_per_group(tcp, Config) ->
  56. Host = os:getenv("PGSQL_TCP_HOST", "toxiproxy"),
  57. Port = list_to_integer(os:getenv("PGSQL_TCP_PORT", "5432")),
  58. [
  59. {pgsql_host, Host},
  60. {pgsql_port, Port},
  61. {enable_tls, false},
  62. {proxy_name, "pgsql_tcp"}
  63. | Config
  64. ];
  65. init_per_group(tls, Config) ->
  66. Host = os:getenv("PGSQL_TLS_HOST", "toxiproxy"),
  67. Port = list_to_integer(os:getenv("PGSQL_TLS_PORT", "5433")),
  68. [
  69. {pgsql_host, Host},
  70. {pgsql_port, Port},
  71. {enable_tls, true},
  72. {proxy_name, "pgsql_tls"}
  73. | Config
  74. ];
  75. init_per_group(async, Config) ->
  76. [{query_mode, async} | Config];
  77. init_per_group(sync, Config) ->
  78. [{query_mode, sync} | Config];
  79. init_per_group(with_batch, Config0) ->
  80. Config = [{enable_batch, true} | Config0],
  81. common_init(Config);
  82. init_per_group(without_batch, Config0) ->
  83. Config = [{enable_batch, false} | Config0],
  84. common_init(Config);
  85. init_per_group(matrix, Config0) ->
  86. Config = [{bridge_type, <<"matrix">>}, {enable_batch, true} | Config0],
  87. common_init(Config);
  88. init_per_group(timescale, Config0) ->
  89. Config = [{bridge_type, <<"timescale">>}, {enable_batch, true} | Config0],
  90. common_init(Config);
  91. init_per_group(_Group, Config) ->
  92. Config.
  93. end_per_group(Group, Config) when Group =:= with_batch; Group =:= without_batch ->
  94. connect_and_drop_table(Config),
  95. ProxyHost = ?config(proxy_host, Config),
  96. ProxyPort = ?config(proxy_port, Config),
  97. emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
  98. ok;
  99. end_per_group(_Group, _Config) ->
  100. ok.
  101. init_per_suite(Config) ->
  102. Config.
  103. end_per_suite(_Config) ->
  104. emqx_mgmt_api_test_util:end_suite(),
  105. ok = emqx_common_test_helpers:stop_apps([emqx_bridge, emqx_conf]),
  106. ok.
  107. init_per_testcase(_Testcase, Config) ->
  108. connect_and_clear_table(Config),
  109. delete_bridge(Config),
  110. snabbkaffe:start_trace(),
  111. Config.
  112. end_per_testcase(_Testcase, Config) ->
  113. ProxyHost = ?config(proxy_host, Config),
  114. ProxyPort = ?config(proxy_port, Config),
  115. emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
  116. connect_and_clear_table(Config),
  117. ok = snabbkaffe:stop(),
  118. delete_bridge(Config),
  119. ok.
  120. %%------------------------------------------------------------------------------
  121. %% Helper fns
  122. %%------------------------------------------------------------------------------
  123. common_init(Config0) ->
  124. BridgeType = proplists:get_value(bridge_type, Config0, <<"pgsql">>),
  125. Host = ?config(pgsql_host, Config0),
  126. Port = ?config(pgsql_port, Config0),
  127. case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of
  128. true ->
  129. % Setup toxiproxy
  130. ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
  131. ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
  132. emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
  133. % Ensure enterprise bridge module is loaded
  134. ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]),
  135. _ = emqx_bridge_enterprise:module_info(),
  136. emqx_mgmt_api_test_util:init_suite(),
  137. % Connect to pgsql directly and create the table
  138. connect_and_create_table(Config0),
  139. {Name, PGConf} = pgsql_config(BridgeType, Config0),
  140. Config =
  141. [
  142. {pgsql_config, PGConf},
  143. {pgsql_bridge_type, BridgeType},
  144. {pgsql_name, Name},
  145. {proxy_host, ProxyHost},
  146. {proxy_port, ProxyPort}
  147. | Config0
  148. ],
  149. Config;
  150. false ->
  151. case os:getenv("IS_CI") of
  152. "yes" ->
  153. throw(no_pgsql);
  154. _ ->
  155. {skip, no_pgsql}
  156. end
  157. end.
  158. pgsql_config(BridgeType, Config) ->
  159. Port = integer_to_list(?config(pgsql_port, Config)),
  160. Server = ?config(pgsql_host, Config) ++ ":" ++ Port,
  161. Name = atom_to_binary(?MODULE),
  162. BatchSize =
  163. case ?config(enable_batch, Config) of
  164. true -> ?BATCH_SIZE;
  165. false -> 1
  166. end,
  167. QueryMode = ?config(query_mode, Config),
  168. TlsEnabled = ?config(enable_tls, Config),
  169. ConfigString =
  170. io_lib:format(
  171. "bridges.~s.~s {\n"
  172. " enable = true\n"
  173. " server = ~p\n"
  174. " database = ~p\n"
  175. " username = ~p\n"
  176. " password = ~p\n"
  177. " sql = ~p\n"
  178. " resource_opts = {\n"
  179. " request_ttl = 500ms\n"
  180. " batch_size = ~b\n"
  181. " query_mode = ~s\n"
  182. " }\n"
  183. " ssl = {\n"
  184. " enable = ~w\n"
  185. " }\n"
  186. "}",
  187. [
  188. BridgeType,
  189. Name,
  190. Server,
  191. ?PGSQL_DATABASE,
  192. ?PGSQL_USERNAME,
  193. ?PGSQL_PASSWORD,
  194. ?SQL_BRIDGE,
  195. BatchSize,
  196. QueryMode,
  197. TlsEnabled
  198. ]
  199. ),
  200. {Name, parse_and_check(ConfigString, BridgeType, Name)}.
  201. parse_and_check(ConfigString, BridgeType, Name) ->
  202. {ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
  203. hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
  204. #{<<"bridges">> := #{BridgeType := #{Name := Config}}} = RawConf,
  205. Config.
  206. create_bridge(Config) ->
  207. create_bridge(Config, _Overrides = #{}).
  208. create_bridge(Config, Overrides) ->
  209. BridgeType = ?config(pgsql_bridge_type, Config),
  210. Name = ?config(pgsql_name, Config),
  211. PGConfig0 = ?config(pgsql_config, Config),
  212. PGConfig = emqx_utils_maps:deep_merge(PGConfig0, Overrides),
  213. emqx_bridge:create(BridgeType, Name, PGConfig).
  214. delete_bridge(Config) ->
  215. BridgeType = ?config(pgsql_bridge_type, Config),
  216. Name = ?config(pgsql_name, Config),
  217. emqx_bridge:remove(BridgeType, Name).
  218. create_bridge_http(Params) ->
  219. Path = emqx_mgmt_api_test_util:api_path(["bridges"]),
  220. AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
  221. case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of
  222. {ok, Res} -> {ok, emqx_utils_json:decode(Res, [return_maps])};
  223. Error -> Error
  224. end.
  225. send_message(Config, Payload) ->
  226. Name = ?config(pgsql_name, Config),
  227. BridgeType = ?config(pgsql_bridge_type, Config),
  228. BridgeID = emqx_bridge_resource:bridge_id(BridgeType, Name),
  229. emqx_bridge:send_message(BridgeID, Payload).
  230. query_resource(Config, Request) ->
  231. Name = ?config(pgsql_name, Config),
  232. BridgeType = ?config(pgsql_bridge_type, Config),
  233. ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
  234. emqx_resource:query(ResourceID, Request, #{timeout => 1_000}).
  235. query_resource_sync(Config, Request) ->
  236. Name = ?config(pgsql_name, Config),
  237. BridgeType = ?config(pgsql_bridge_type, Config),
  238. ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
  239. emqx_resource_buffer_worker:simple_sync_query(ResourceID, Request).
  240. query_resource_async(Config, Request) ->
  241. query_resource_async(Config, Request, _Opts = #{}).
  242. query_resource_async(Config, Request, Opts) ->
  243. Name = ?config(pgsql_name, Config),
  244. BridgeType = ?config(pgsql_bridge_type, Config),
  245. Ref = alias([reply]),
  246. AsyncReplyFun = fun(Result) -> Ref ! {result, Ref, Result} end,
  247. ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
  248. Timeout = maps:get(timeout, Opts, 500),
  249. Return = emqx_resource:query(ResourceID, Request, #{
  250. timeout => Timeout,
  251. async_reply_fun => {AsyncReplyFun, []}
  252. }),
  253. {Return, Ref}.
  254. receive_result(Ref, Timeout) ->
  255. receive
  256. {result, Ref, Result} ->
  257. {ok, Result};
  258. {Ref, Result} ->
  259. {ok, Result}
  260. after Timeout ->
  261. timeout
  262. end.
  263. connect_direct_pgsql(Config) ->
  264. Opts = #{
  265. host => ?config(pgsql_host, Config),
  266. port => ?config(pgsql_port, Config),
  267. username => ?PGSQL_USERNAME,
  268. password => ?PGSQL_PASSWORD,
  269. database => ?PGSQL_DATABASE
  270. },
  271. SslOpts =
  272. case ?config(enable_tls, Config) of
  273. true ->
  274. Opts#{
  275. ssl => true,
  276. ssl_opts => emqx_tls_lib:to_client_opts(#{enable => true})
  277. };
  278. false ->
  279. Opts
  280. end,
  281. {ok, Con} = epgsql:connect(SslOpts),
  282. Con.
  283. % These funs connect and then stop the pgsql connection
  284. connect_and_create_table(Config) ->
  285. Con = connect_direct_pgsql(Config),
  286. {ok, _, _} = epgsql:squery(Con, ?SQL_CREATE_TABLE),
  287. ok = epgsql:close(Con).
  288. connect_and_drop_table(Config) ->
  289. Con = connect_direct_pgsql(Config),
  290. {ok, _, _} = epgsql:squery(Con, ?SQL_DROP_TABLE),
  291. ok = epgsql:close(Con).
  292. connect_and_clear_table(Config) ->
  293. Con = connect_direct_pgsql(Config),
  294. _ = epgsql:squery(Con, ?SQL_CREATE_TABLE),
  295. {ok, _} = epgsql:squery(Con, ?SQL_DELETE),
  296. ok = epgsql:close(Con).
  297. connect_and_get_payload(Config) ->
  298. Con = connect_direct_pgsql(Config),
  299. {ok, _, [{Result}]} = epgsql:squery(Con, ?SQL_SELECT),
  300. ok = epgsql:close(Con),
  301. Result.
  302. %%------------------------------------------------------------------------------
  303. %% Testcases
  304. %%------------------------------------------------------------------------------
  305. t_setup_via_config_and_publish(Config) ->
  306. ?assertMatch(
  307. {ok, _},
  308. create_bridge(Config)
  309. ),
  310. Val = integer_to_binary(erlang:unique_integer()),
  311. SentData = #{payload => Val, timestamp => 1668602148000},
  312. ?check_trace(
  313. begin
  314. {_, {ok, _}} =
  315. ?wait_async_action(
  316. send_message(Config, SentData),
  317. #{?snk_kind := pgsql_connector_query_return},
  318. 10_000
  319. ),
  320. ?assertMatch(
  321. Val,
  322. connect_and_get_payload(Config)
  323. ),
  324. ok
  325. end,
  326. fun(Trace0) ->
  327. Trace = ?of_kind(pgsql_connector_query_return, Trace0),
  328. case ?config(enable_batch, Config) of
  329. true ->
  330. ?assertMatch([#{result := {_, [{ok, 1}]}}], Trace);
  331. false ->
  332. ?assertMatch([#{result := {ok, 1}}], Trace)
  333. end,
  334. ok
  335. end
  336. ),
  337. ok.
  338. t_setup_via_http_api_and_publish(Config) ->
  339. BridgeType = ?config(pgsql_bridge_type, Config),
  340. Name = ?config(pgsql_name, Config),
  341. PgsqlConfig0 = ?config(pgsql_config, Config),
  342. QueryMode = ?config(query_mode, Config),
  343. PgsqlConfig = PgsqlConfig0#{
  344. <<"name">> => Name,
  345. <<"type">> => BridgeType
  346. },
  347. ?assertMatch(
  348. {ok, _},
  349. create_bridge_http(PgsqlConfig)
  350. ),
  351. Val = integer_to_binary(erlang:unique_integer()),
  352. SentData = #{payload => Val, timestamp => 1668602148000},
  353. ?check_trace(
  354. begin
  355. {Res, {ok, _}} =
  356. ?wait_async_action(
  357. send_message(Config, SentData),
  358. #{?snk_kind := pgsql_connector_query_return},
  359. 10_000
  360. ),
  361. case QueryMode of
  362. async ->
  363. ok;
  364. sync ->
  365. ?assertEqual({ok, 1}, Res)
  366. end,
  367. ?assertMatch(
  368. Val,
  369. connect_and_get_payload(Config)
  370. ),
  371. ok
  372. end,
  373. fun(Trace0) ->
  374. Trace = ?of_kind(pgsql_connector_query_return, Trace0),
  375. case ?config(enable_batch, Config) of
  376. true ->
  377. ?assertMatch([#{result := {_, [{ok, 1}]}}], Trace);
  378. false ->
  379. ?assertMatch([#{result := {ok, 1}}], Trace)
  380. end,
  381. ok
  382. end
  383. ),
  384. ok.
  385. t_get_status(Config) ->
  386. ?assertMatch(
  387. {ok, _},
  388. create_bridge(Config)
  389. ),
  390. ProxyPort = ?config(proxy_port, Config),
  391. ProxyHost = ?config(proxy_host, Config),
  392. ProxyName = ?config(proxy_name, Config),
  393. Name = ?config(pgsql_name, Config),
  394. BridgeType = ?config(pgsql_bridge_type, Config),
  395. ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
  396. ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID)),
  397. emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
  398. ?assertMatch(
  399. {ok, Status} when Status =:= disconnected orelse Status =:= connecting,
  400. emqx_resource_manager:health_check(ResourceID)
  401. )
  402. end),
  403. ok.
  404. t_create_disconnected(Config) ->
  405. ProxyPort = ?config(proxy_port, Config),
  406. ProxyHost = ?config(proxy_host, Config),
  407. ProxyName = ?config(proxy_name, Config),
  408. ?check_trace(
  409. emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
  410. ?assertMatch({ok, _}, create_bridge(Config))
  411. end),
  412. fun(Trace) ->
  413. ?assertMatch(
  414. [#{error := {start_pool_failed, _, _}}],
  415. ?of_kind(pgsql_connector_start_failed, Trace)
  416. ),
  417. ok
  418. end
  419. ),
  420. ok.
  421. t_write_failure(Config) ->
  422. ProxyName = ?config(proxy_name, Config),
  423. ProxyPort = ?config(proxy_port, Config),
  424. ProxyHost = ?config(proxy_host, Config),
  425. QueryMode = ?config(query_mode, Config),
  426. {ok, _} = create_bridge(Config),
  427. Val = integer_to_binary(erlang:unique_integer()),
  428. SentData = #{payload => Val, timestamp => 1668602148000},
  429. ?check_trace(
  430. emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
  431. {_, {ok, _}} =
  432. ?wait_async_action(
  433. case QueryMode of
  434. sync ->
  435. ?assertMatch({error, _}, send_message(Config, SentData));
  436. async ->
  437. send_message(Config, SentData)
  438. end,
  439. #{?snk_kind := buffer_worker_flush_nack},
  440. 15_000
  441. )
  442. end),
  443. fun(Trace0) ->
  444. ct:pal("trace: ~p", [Trace0]),
  445. Trace = ?of_kind(buffer_worker_flush_nack, Trace0),
  446. ?assertMatch([#{result := {error, _}} | _], Trace),
  447. [#{result := {error, Error}} | _] = Trace,
  448. case Error of
  449. {resource_error, _} ->
  450. ok;
  451. {recoverable_error, disconnected} ->
  452. ok;
  453. _ ->
  454. ct:fail("unexpected error: ~p", [Error])
  455. end
  456. end
  457. ),
  458. ok.
  459. % This test doesn't work with batch enabled since it is not possible
  460. % to set the timeout directly for batch queries
  461. t_write_timeout(Config) ->
  462. ProxyName = ?config(proxy_name, Config),
  463. ProxyPort = ?config(proxy_port, Config),
  464. ProxyHost = ?config(proxy_host, Config),
  465. QueryMode = ?config(query_mode, Config),
  466. {ok, _} = create_bridge(
  467. Config,
  468. #{
  469. <<"resource_opts">> => #{
  470. <<"resume_interval">> => <<"100ms">>,
  471. <<"health_check_interval">> => <<"100ms">>
  472. }
  473. }
  474. ),
  475. Val = integer_to_binary(erlang:unique_integer()),
  476. SentData = #{payload => Val, timestamp => 1668602148000},
  477. {ok, SRef} = snabbkaffe:subscribe(
  478. ?match_event(#{?snk_kind := call_query_enter}),
  479. 2_000
  480. ),
  481. Res0 =
  482. emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() ->
  483. Res1 =
  484. case QueryMode of
  485. async ->
  486. query_resource_async(Config, {send_message, SentData}, #{timeout => 60_000});
  487. sync ->
  488. query_resource(Config, {send_message, SentData})
  489. end,
  490. ?assertMatch({ok, [_]}, snabbkaffe:receive_events(SRef)),
  491. Res1
  492. end),
  493. case Res0 of
  494. {_, Ref} when is_reference(Ref) ->
  495. case receive_result(Ref, 15_000) of
  496. {ok, Res} ->
  497. %% we may receive a successful result depending on
  498. %% timing, if the request is retried after the
  499. %% failure is healed.
  500. case Res of
  501. {error, {unrecoverable_error, _}} ->
  502. ok;
  503. {ok, _} ->
  504. ok;
  505. _ ->
  506. ct:fail("unexpected result: ~p", [Res])
  507. end;
  508. timeout ->
  509. ct:pal("mailbox:\n ~p", [process_info(self(), messages)]),
  510. ct:fail("no response received")
  511. end;
  512. _ ->
  513. ?assertMatch({error, {resource_error, #{reason := timeout}}}, Res0)
  514. end,
  515. ok.
  516. t_simple_sql_query(Config) ->
  517. EnableBatch = ?config(enable_batch, Config),
  518. QueryMode = ?config(query_mode, Config),
  519. ?assertMatch(
  520. {ok, _},
  521. create_bridge(Config)
  522. ),
  523. Request = {sql, <<"SELECT count(1) AS T">>},
  524. Result =
  525. case QueryMode of
  526. sync ->
  527. query_resource(Config, Request);
  528. async ->
  529. {_, Ref} = query_resource_async(Config, Request),
  530. {ok, Res} = receive_result(Ref, 2_000),
  531. Res
  532. end,
  533. case EnableBatch of
  534. true ->
  535. ?assertEqual({error, {unrecoverable_error, batch_prepare_not_implemented}}, Result);
  536. false ->
  537. ?assertMatch({ok, _, [{1}]}, Result)
  538. end,
  539. ok.
  540. t_missing_data(Config) ->
  541. ?assertMatch(
  542. {ok, _},
  543. create_bridge(Config)
  544. ),
  545. {_, {ok, Event}} =
  546. ?wait_async_action(
  547. send_message(Config, #{}),
  548. #{?snk_kind := buffer_worker_flush_ack},
  549. 2_000
  550. ),
  551. ?assertMatch(
  552. #{
  553. result :=
  554. {error,
  555. {unrecoverable_error, {error, error, <<"23502">>, not_null_violation, _, _}}}
  556. },
  557. Event
  558. ),
  559. ok.
  560. t_bad_sql_parameter(Config) ->
  561. QueryMode = ?config(query_mode, Config),
  562. EnableBatch = ?config(enable_batch, Config),
  563. ?assertMatch(
  564. {ok, _},
  565. create_bridge(Config)
  566. ),
  567. Request = {sql, <<"">>, [bad_parameter]},
  568. Result =
  569. case QueryMode of
  570. sync ->
  571. query_resource(Config, Request);
  572. async ->
  573. {_, Ref} = query_resource_async(Config, Request),
  574. {ok, Res} = receive_result(Ref, 2_000),
  575. Res
  576. end,
  577. case EnableBatch of
  578. true ->
  579. ?assertEqual({error, {unrecoverable_error, invalid_request}}, Result);
  580. false ->
  581. ?assertMatch(
  582. {error, {unrecoverable_error, _}}, Result
  583. )
  584. end,
  585. ok.
  586. t_nasty_sql_string(Config) ->
  587. ?assertMatch({ok, _}, create_bridge(Config)),
  588. Payload = list_to_binary(lists:seq(1, 127)),
  589. Message = #{payload => Payload, timestamp => erlang:system_time(millisecond)},
  590. {_, {ok, _}} =
  591. ?wait_async_action(
  592. send_message(Config, Message),
  593. #{?snk_kind := pgsql_connector_query_return},
  594. 1_000
  595. ),
  596. ?assertEqual(Payload, connect_and_get_payload(Config)).
  597. t_missing_table(Config) ->
  598. Name = ?config(pgsql_name, Config),
  599. BridgeType = ?config(pgsql_bridge_type, Config),
  600. ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
  601. ?check_trace(
  602. begin
  603. connect_and_drop_table(Config),
  604. ?assertMatch({ok, _}, create_bridge(Config)),
  605. ?retry(
  606. _Sleep = 1_000,
  607. _Attempts = 20,
  608. ?assertMatch(
  609. {ok, Status} when Status == connecting orelse Status == disconnected,
  610. emqx_resource_manager:health_check(ResourceID)
  611. )
  612. ),
  613. Val = integer_to_binary(erlang:unique_integer()),
  614. SentData = #{payload => Val, timestamp => 1668602148000},
  615. Timeout = 1000,
  616. ?assertMatch(
  617. {error, {resource_error, #{reason := unhealthy_target}}},
  618. query_resource(Config, {send_message, SentData, [], Timeout})
  619. ),
  620. ok
  621. end,
  622. fun(Trace) ->
  623. ?assertMatch([_], ?of_kind(pgsql_undefined_table, Trace)),
  624. ok
  625. end
  626. ),
  627. connect_and_create_table(Config),
  628. ok.
  629. t_table_removed(Config) ->
  630. Name = ?config(pgsql_name, Config),
  631. BridgeType = ?config(pgsql_bridge_type, Config),
  632. ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
  633. ?check_trace(
  634. begin
  635. connect_and_create_table(Config),
  636. ?assertMatch({ok, _}, create_bridge(Config)),
  637. ?retry(
  638. _Sleep = 1_000,
  639. _Attempts = 20,
  640. ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID))
  641. ),
  642. connect_and_drop_table(Config),
  643. Val = integer_to_binary(erlang:unique_integer()),
  644. SentData = #{payload => Val, timestamp => 1668602148000},
  645. case query_resource_sync(Config, {send_message, SentData, []}) of
  646. {error, {unrecoverable_error, {error, error, <<"42P01">>, undefined_table, _, _}}} ->
  647. ok;
  648. ?RESOURCE_ERROR_M(not_connected, _) ->
  649. ok;
  650. Res ->
  651. ct:fail("unexpected result: ~p", [Res])
  652. end,
  653. ok
  654. end,
  655. []
  656. ),
  657. connect_and_create_table(Config),
  658. ok.
  659. t_concurrent_health_checks(Config) ->
  660. Name = ?config(pgsql_name, Config),
  661. BridgeType = ?config(pgsql_bridge_type, Config),
  662. ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
  663. ?check_trace(
  664. begin
  665. connect_and_create_table(Config),
  666. ?assertMatch({ok, _}, create_bridge(Config)),
  667. ?retry(
  668. _Sleep = 1_000,
  669. _Attempts = 20,
  670. ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID))
  671. ),
  672. emqx_utils:pmap(
  673. fun(_) ->
  674. ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID))
  675. end,
  676. lists:seq(1, 20)
  677. ),
  678. ok
  679. end,
  680. fun(Trace) ->
  681. ?assertEqual([], ?of_kind(postgres_connector_bad_parse2, Trace)),
  682. ok
  683. end
  684. ),
  685. ok.