emqx_bridge_pgsql_SUITE.erl 25 KB


  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%--------------------------------------------------------------------
  4. -module(emqx_bridge_pgsql_SUITE).
  5. -compile(nowarn_export_all).
  6. -compile(export_all).
  7. -include_lib("eunit/include/eunit.hrl").
  8. -include_lib("common_test/include/ct.hrl").
  9. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  10. -include("emqx_resource_errors.hrl").
  11. % SQL definitions
  12. -define(SQL_BRIDGE,
  13. "INSERT INTO mqtt_test(payload, arrived) "
  14. "VALUES (${payload}, TO_TIMESTAMP((${timestamp} :: bigint)/1000))"
  15. ).
  16. -define(SQL_CREATE_TABLE,
  17. "CREATE TABLE IF NOT EXISTS mqtt_test (payload text, arrived timestamp NOT NULL) "
  18. ).
  19. -define(SQL_DROP_TABLE, "DROP TABLE mqtt_test").
  20. -define(SQL_DELETE, "DELETE from mqtt_test").
  21. -define(SQL_SELECT, "SELECT payload FROM mqtt_test").
  22. % DB defaults
  23. -define(PGSQL_DATABASE, "mqtt").
  24. -define(PGSQL_USERNAME, "root").
  25. -define(PGSQL_PASSWORD, "public").
  26. -define(BATCH_SIZE, 10).
  27. %%------------------------------------------------------------------------------
  28. %% CT boilerplate
  29. %%------------------------------------------------------------------------------
  30. all() ->
  31. [
  32. {group, tcp},
  33. {group, tls}
  34. ].
  35. groups() ->
  36. TCs = emqx_common_test_helpers:all(?MODULE),
  37. NonBatchCases = [t_write_timeout],
  38. BatchVariantGroups = [
  39. {group, with_batch},
  40. {group, without_batch},
  41. {group, matrix},
  42. {group, timescale}
  43. ],
  44. QueryModeGroups = [{async, BatchVariantGroups}, {sync, BatchVariantGroups}],
  45. [
  46. {tcp, QueryModeGroups},
  47. {tls, QueryModeGroups},
  48. {async, BatchVariantGroups},
  49. {sync, BatchVariantGroups},
  50. {with_batch, TCs -- NonBatchCases},
  51. {without_batch, TCs},
  52. {matrix, [t_setup_via_config_and_publish, t_setup_via_http_api_and_publish]},
  53. {timescale, [t_setup_via_config_and_publish, t_setup_via_http_api_and_publish]}
  54. ].
  55. init_per_group(tcp, Config) ->
  56. Host = os:getenv("PGSQL_TCP_HOST", "toxiproxy"),
  57. Port = list_to_integer(os:getenv("PGSQL_TCP_PORT", "5432")),
  58. [
  59. {pgsql_host, Host},
  60. {pgsql_port, Port},
  61. {enable_tls, false},
  62. {proxy_name, "pgsql_tcp"}
  63. | Config
  64. ];
  65. init_per_group(tls, Config) ->
  66. Host = os:getenv("PGSQL_TLS_HOST", "toxiproxy"),
  67. Port = list_to_integer(os:getenv("PGSQL_TLS_PORT", "5433")),
  68. [
  69. {pgsql_host, Host},
  70. {pgsql_port, Port},
  71. {enable_tls, true},
  72. {proxy_name, "pgsql_tls"}
  73. | Config
  74. ];
  75. init_per_group(async, Config) ->
  76. [{query_mode, async} | Config];
  77. init_per_group(sync, Config) ->
  78. [{query_mode, sync} | Config];
  79. init_per_group(with_batch, Config0) ->
  80. Config = [{enable_batch, true} | Config0],
  81. common_init(Config);
  82. init_per_group(without_batch, Config0) ->
  83. Config = [{enable_batch, false} | Config0],
  84. common_init(Config);
  85. init_per_group(matrix, Config0) ->
  86. Config = [{bridge_type, <<"matrix">>}, {enable_batch, true} | Config0],
  87. common_init(Config);
  88. init_per_group(timescale, Config0) ->
  89. Config = [{bridge_type, <<"timescale">>}, {enable_batch, true} | Config0],
  90. common_init(Config);
  91. init_per_group(_Group, Config) ->
  92. Config.
  93. end_per_group(Group, Config) when Group =:= with_batch; Group =:= without_batch ->
  94. connect_and_drop_table(Config),
  95. ProxyHost = ?config(proxy_host, Config),
  96. ProxyPort = ?config(proxy_port, Config),
  97. emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
  98. ok;
  99. end_per_group(_Group, _Config) ->
  100. ok.
  101. init_per_suite(Config) ->
  102. Config.
  103. end_per_suite(_Config) ->
  104. emqx_mgmt_api_test_util:end_suite(),
  105. ok = emqx_common_test_helpers:stop_apps([emqx_bridge, emqx_conf]),
  106. ok.
  107. init_per_testcase(_Testcase, Config) ->
  108. connect_and_clear_table(Config),
  109. delete_bridge(Config),
  110. snabbkaffe:start_trace(),
  111. Config.
  112. end_per_testcase(_Testcase, Config) ->
  113. ProxyHost = ?config(proxy_host, Config),
  114. ProxyPort = ?config(proxy_port, Config),
  115. emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
  116. connect_and_clear_table(Config),
  117. ok = snabbkaffe:stop(),
  118. delete_bridge(Config),
  119. ok.
  120. %%------------------------------------------------------------------------------
  121. %% Helper fns
  122. %%------------------------------------------------------------------------------
  123. common_init(Config0) ->
  124. BridgeType = proplists:get_value(bridge_type, Config0, <<"pgsql">>),
  125. Host = ?config(pgsql_host, Config0),
  126. Port = ?config(pgsql_port, Config0),
  127. case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of
  128. true ->
  129. % Setup toxiproxy
  130. ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
  131. ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
  132. emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
  133. % Ensure enterprise bridge module is loaded
  134. ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]),
  135. _ = emqx_bridge_enterprise:module_info(),
  136. emqx_mgmt_api_test_util:init_suite(),
  137. % Connect to pgsql directly and create the table
  138. connect_and_create_table(Config0),
  139. {Name, PGConf} = pgsql_config(BridgeType, Config0),
  140. Config =
  141. [
  142. {pgsql_config, PGConf},
  143. {pgsql_bridge_type, BridgeType},
  144. {pgsql_name, Name},
  145. {proxy_host, ProxyHost},
  146. {proxy_port, ProxyPort}
  147. | Config0
  148. ],
  149. Config;
  150. false ->
  151. case os:getenv("IS_CI") of
  152. "yes" ->
  153. throw(no_pgsql);
  154. _ ->
  155. {skip, no_pgsql}
  156. end
  157. end.
  158. pgsql_config(BridgeType, Config) ->
  159. Port = integer_to_list(?config(pgsql_port, Config)),
  160. Server = ?config(pgsql_host, Config) ++ ":" ++ Port,
  161. Name = atom_to_binary(?MODULE),
  162. BatchSize =
  163. case ?config(enable_batch, Config) of
  164. true -> ?BATCH_SIZE;
  165. false -> 1
  166. end,
  167. QueryMode = ?config(query_mode, Config),
  168. TlsEnabled = ?config(enable_tls, Config),
  169. %% NOTE: supplying password through a file here, to verify that it works.
  170. Password = create_passfile(BridgeType, Config),
  171. ConfigString =
  172. io_lib:format(
  173. "bridges.~s.~s {"
  174. "\n enable = true"
  175. "\n server = ~p"
  176. "\n database = ~p"
  177. "\n username = ~p"
  178. "\n password = ~p"
  179. "\n sql = ~p"
  180. "\n resource_opts = {"
  181. "\n request_ttl = 500ms"
  182. "\n batch_size = ~b"
  183. "\n query_mode = ~s"
  184. "\n }"
  185. "\n ssl = {"
  186. "\n enable = ~w"
  187. "\n }"
  188. "\n }",
  189. [
  190. BridgeType,
  191. Name,
  192. Server,
  193. ?PGSQL_DATABASE,
  194. ?PGSQL_USERNAME,
  195. Password,
  196. ?SQL_BRIDGE,
  197. BatchSize,
  198. QueryMode,
  199. TlsEnabled
  200. ]
  201. ),
  202. {Name, parse_and_check(ConfigString, BridgeType, Name)}.
  203. create_passfile(BridgeType, Config) ->
  204. Filename = binary_to_list(BridgeType) ++ ".passfile",
  205. Filepath = filename:join(?config(priv_dir, Config), Filename),
  206. ok = file:write_file(Filepath, ?PGSQL_PASSWORD),
  207. "file://" ++ Filepath.
  208. parse_and_check(ConfigString, BridgeType, Name) ->
  209. {ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
  210. hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
  211. #{<<"bridges">> := #{BridgeType := #{Name := Config}}} = RawConf,
  212. Config.
  213. create_bridge(Config) ->
  214. create_bridge(Config, _Overrides = #{}).
  215. create_bridge(Config, Overrides) ->
  216. BridgeType = ?config(pgsql_bridge_type, Config),
  217. Name = ?config(pgsql_name, Config),
  218. PGConfig0 = ?config(pgsql_config, Config),
  219. PGConfig = emqx_utils_maps:deep_merge(PGConfig0, Overrides),
  220. emqx_bridge:create(BridgeType, Name, PGConfig).
  221. delete_bridge(Config) ->
  222. BridgeType = ?config(pgsql_bridge_type, Config),
  223. Name = ?config(pgsql_name, Config),
  224. emqx_bridge:remove(BridgeType, Name).
  225. create_bridge_http(Params) ->
  226. Path = emqx_mgmt_api_test_util:api_path(["bridges"]),
  227. AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
  228. case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of
  229. {ok, Res} -> {ok, emqx_utils_json:decode(Res, [return_maps])};
  230. Error -> Error
  231. end.
  232. send_message(Config, Payload) ->
  233. Name = ?config(pgsql_name, Config),
  234. BridgeType = ?config(pgsql_bridge_type, Config),
  235. BridgeID = emqx_bridge_resource:bridge_id(BridgeType, Name),
  236. emqx_bridge:send_message(BridgeID, Payload).
  237. query_resource(Config, Request) ->
  238. Name = ?config(pgsql_name, Config),
  239. BridgeType = ?config(pgsql_bridge_type, Config),
  240. ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
  241. emqx_resource:query(ResourceID, Request, #{timeout => 1_000}).
  242. query_resource_sync(Config, Request) ->
  243. Name = ?config(pgsql_name, Config),
  244. BridgeType = ?config(pgsql_bridge_type, Config),
  245. ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
  246. emqx_resource_buffer_worker:simple_sync_query(ResourceID, Request).
  247. query_resource_async(Config, Request) ->
  248. query_resource_async(Config, Request, _Opts = #{}).
  249. query_resource_async(Config, Request, Opts) ->
  250. Name = ?config(pgsql_name, Config),
  251. BridgeType = ?config(pgsql_bridge_type, Config),
  252. Ref = alias([reply]),
  253. AsyncReplyFun = fun(Result) -> Ref ! {result, Ref, Result} end,
  254. ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
  255. Timeout = maps:get(timeout, Opts, 500),
  256. Return = emqx_resource:query(ResourceID, Request, #{
  257. timeout => Timeout,
  258. async_reply_fun => {AsyncReplyFun, []}
  259. }),
  260. {Return, Ref}.
  261. receive_result(Ref, Timeout) ->
  262. receive
  263. {result, Ref, Result} ->
  264. {ok, Result};
  265. {Ref, Result} ->
  266. {ok, Result}
  267. after Timeout ->
  268. timeout
  269. end.
  270. connect_direct_pgsql(Config) ->
  271. Opts = #{
  272. host => ?config(pgsql_host, Config),
  273. port => ?config(pgsql_port, Config),
  274. username => ?PGSQL_USERNAME,
  275. password => ?PGSQL_PASSWORD,
  276. database => ?PGSQL_DATABASE
  277. },
  278. SslOpts =
  279. case ?config(enable_tls, Config) of
  280. true ->
  281. Opts#{
  282. ssl => true,
  283. ssl_opts => emqx_tls_lib:to_client_opts(#{enable => true})
  284. };
  285. false ->
  286. Opts
  287. end,
  288. {ok, Con} = epgsql:connect(SslOpts),
  289. Con.
  290. % These funs connect and then stop the pgsql connection
  291. connect_and_create_table(Config) ->
  292. Con = connect_direct_pgsql(Config),
  293. {ok, _, _} = epgsql:squery(Con, ?SQL_CREATE_TABLE),
  294. ok = epgsql:close(Con).
  295. connect_and_drop_table(Config) ->
  296. Con = connect_direct_pgsql(Config),
  297. {ok, _, _} = epgsql:squery(Con, ?SQL_DROP_TABLE),
  298. ok = epgsql:close(Con).
  299. connect_and_clear_table(Config) ->
  300. Con = connect_direct_pgsql(Config),
  301. _ = epgsql:squery(Con, ?SQL_CREATE_TABLE),
  302. {ok, _} = epgsql:squery(Con, ?SQL_DELETE),
  303. ok = epgsql:close(Con).
  304. connect_and_get_payload(Config) ->
  305. Con = connect_direct_pgsql(Config),
  306. {ok, _, [{Result}]} = epgsql:squery(Con, ?SQL_SELECT),
  307. ok = epgsql:close(Con),
  308. Result.
  309. %%------------------------------------------------------------------------------
  310. %% Testcases
  311. %%------------------------------------------------------------------------------
  312. t_setup_via_config_and_publish(Config) ->
  313. ?assertMatch(
  314. {ok, _},
  315. create_bridge(Config)
  316. ),
  317. Val = integer_to_binary(erlang:unique_integer()),
  318. SentData = #{payload => Val, timestamp => 1668602148000},
  319. ?check_trace(
  320. begin
  321. {_, {ok, _}} =
  322. ?wait_async_action(
  323. send_message(Config, SentData),
  324. #{?snk_kind := pgsql_connector_query_return},
  325. 10_000
  326. ),
  327. ?assertMatch(
  328. Val,
  329. connect_and_get_payload(Config)
  330. ),
  331. ok
  332. end,
  333. fun(Trace0) ->
  334. Trace = ?of_kind(pgsql_connector_query_return, Trace0),
  335. case ?config(enable_batch, Config) of
  336. true ->
  337. ?assertMatch([#{result := {_, [{ok, 1}]}}], Trace);
  338. false ->
  339. ?assertMatch([#{result := {ok, 1}}], Trace)
  340. end,
  341. ok
  342. end
  343. ),
  344. ok.
  345. t_setup_via_http_api_and_publish(Config) ->
  346. BridgeType = ?config(pgsql_bridge_type, Config),
  347. Name = ?config(pgsql_name, Config),
  348. PgsqlConfig0 = ?config(pgsql_config, Config),
  349. QueryMode = ?config(query_mode, Config),
  350. PgsqlConfig = PgsqlConfig0#{
  351. <<"name">> => Name,
  352. <<"type">> => BridgeType,
  353. %% NOTE: using literal passwords with HTTP API requests.
  354. <<"password">> => <<?PGSQL_PASSWORD>>
  355. },
  356. ?assertMatch(
  357. {ok, _},
  358. create_bridge_http(PgsqlConfig)
  359. ),
  360. Val = integer_to_binary(erlang:unique_integer()),
  361. SentData = #{payload => Val, timestamp => 1668602148000},
  362. ?check_trace(
  363. begin
  364. {Res, {ok, _}} =
  365. ?wait_async_action(
  366. send_message(Config, SentData),
  367. #{?snk_kind := pgsql_connector_query_return},
  368. 10_000
  369. ),
  370. case QueryMode of
  371. async ->
  372. ok;
  373. sync ->
  374. ?assertEqual({ok, 1}, Res)
  375. end,
  376. ?assertMatch(
  377. Val,
  378. connect_and_get_payload(Config)
  379. ),
  380. ok
  381. end,
  382. fun(Trace0) ->
  383. Trace = ?of_kind(pgsql_connector_query_return, Trace0),
  384. case ?config(enable_batch, Config) of
  385. true ->
  386. ?assertMatch([#{result := {_, [{ok, 1}]}}], Trace);
  387. false ->
  388. ?assertMatch([#{result := {ok, 1}}], Trace)
  389. end,
  390. ok
  391. end
  392. ),
  393. ok.
  394. t_get_status(Config) ->
  395. ?assertMatch(
  396. {ok, _},
  397. create_bridge(Config)
  398. ),
  399. ProxyPort = ?config(proxy_port, Config),
  400. ProxyHost = ?config(proxy_host, Config),
  401. ProxyName = ?config(proxy_name, Config),
  402. Name = ?config(pgsql_name, Config),
  403. BridgeType = ?config(pgsql_bridge_type, Config),
  404. ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
  405. ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID)),
  406. emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
  407. ?assertMatch(
  408. {ok, Status} when Status =:= disconnected orelse Status =:= connecting,
  409. emqx_resource_manager:health_check(ResourceID)
  410. )
  411. end),
  412. ok.
  413. t_create_disconnected(Config) ->
  414. ProxyPort = ?config(proxy_port, Config),
  415. ProxyHost = ?config(proxy_host, Config),
  416. ProxyName = ?config(proxy_name, Config),
  417. ?check_trace(
  418. emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
  419. ?assertMatch({ok, _}, create_bridge(Config))
  420. end),
  421. fun(Trace) ->
  422. ?assertMatch(
  423. [#{error := {start_pool_failed, _, _}}],
  424. ?of_kind(pgsql_connector_start_failed, Trace)
  425. ),
  426. ok
  427. end
  428. ),
  429. ok.
  430. t_write_failure(Config) ->
  431. ProxyName = ?config(proxy_name, Config),
  432. ProxyPort = ?config(proxy_port, Config),
  433. ProxyHost = ?config(proxy_host, Config),
  434. QueryMode = ?config(query_mode, Config),
  435. {ok, _} = create_bridge(Config),
  436. Val = integer_to_binary(erlang:unique_integer()),
  437. SentData = #{payload => Val, timestamp => 1668602148000},
  438. ?check_trace(
  439. emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
  440. {_, {ok, _}} =
  441. ?wait_async_action(
  442. case QueryMode of
  443. sync ->
  444. ?assertMatch({error, _}, send_message(Config, SentData));
  445. async ->
  446. send_message(Config, SentData)
  447. end,
  448. #{?snk_kind := buffer_worker_flush_nack},
  449. 15_000
  450. )
  451. end),
  452. fun(Trace0) ->
  453. ct:pal("trace: ~p", [Trace0]),
  454. Trace = ?of_kind(buffer_worker_flush_nack, Trace0),
  455. ?assertMatch([#{result := {error, _}} | _], Trace),
  456. [#{result := {error, Error}} | _] = Trace,
  457. case Error of
  458. {resource_error, _} ->
  459. ok;
  460. {recoverable_error, disconnected} ->
  461. ok;
  462. _ ->
  463. ct:fail("unexpected error: ~p", [Error])
  464. end
  465. end
  466. ),
  467. ok.
  468. % This test doesn't work with batch enabled since it is not possible
  469. % to set the timeout directly for batch queries
  470. t_write_timeout(Config) ->
  471. ProxyName = ?config(proxy_name, Config),
  472. ProxyPort = ?config(proxy_port, Config),
  473. ProxyHost = ?config(proxy_host, Config),
  474. QueryMode = ?config(query_mode, Config),
  475. {ok, _} = create_bridge(
  476. Config,
  477. #{
  478. <<"resource_opts">> => #{
  479. <<"resume_interval">> => <<"100ms">>,
  480. <<"health_check_interval">> => <<"100ms">>
  481. }
  482. }
  483. ),
  484. Val = integer_to_binary(erlang:unique_integer()),
  485. SentData = #{payload => Val, timestamp => 1668602148000},
  486. {ok, SRef} = snabbkaffe:subscribe(
  487. ?match_event(#{?snk_kind := call_query_enter}),
  488. 2_000
  489. ),
  490. Res0 =
  491. emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() ->
  492. Res1 =
  493. case QueryMode of
  494. async ->
  495. query_resource_async(Config, {send_message, SentData}, #{timeout => 60_000});
  496. sync ->
  497. query_resource(Config, {send_message, SentData})
  498. end,
  499. ?assertMatch({ok, [_]}, snabbkaffe:receive_events(SRef)),
  500. Res1
  501. end),
  502. case Res0 of
  503. {_, Ref} when is_reference(Ref) ->
  504. case receive_result(Ref, 15_000) of
  505. {ok, Res} ->
  506. %% we may receive a successful result depending on
  507. %% timing, if the request is retried after the
  508. %% failure is healed.
  509. case Res of
  510. {error, {unrecoverable_error, _}} ->
  511. ok;
  512. {ok, _} ->
  513. ok;
  514. _ ->
  515. ct:fail("unexpected result: ~p", [Res])
  516. end;
  517. timeout ->
  518. ct:pal("mailbox:\n ~p", [process_info(self(), messages)]),
  519. ct:fail("no response received")
  520. end;
  521. _ ->
  522. ?assertMatch({error, {resource_error, #{reason := timeout}}}, Res0)
  523. end,
  524. ok.
  525. t_simple_sql_query(Config) ->
  526. EnableBatch = ?config(enable_batch, Config),
  527. QueryMode = ?config(query_mode, Config),
  528. ?assertMatch(
  529. {ok, _},
  530. create_bridge(Config)
  531. ),
  532. Request = {sql, <<"SELECT count(1) AS T">>},
  533. Result =
  534. case QueryMode of
  535. sync ->
  536. query_resource(Config, Request);
  537. async ->
  538. {_, Ref} = query_resource_async(Config, Request),
  539. {ok, Res} = receive_result(Ref, 2_000),
  540. Res
  541. end,
  542. case EnableBatch of
  543. true ->
  544. ?assertEqual({error, {unrecoverable_error, batch_prepare_not_implemented}}, Result);
  545. false ->
  546. ?assertMatch({ok, _, [{1}]}, Result)
  547. end,
  548. ok.
  549. t_missing_data(Config) ->
  550. ?assertMatch(
  551. {ok, _},
  552. create_bridge(Config)
  553. ),
  554. {_, {ok, Event}} =
  555. ?wait_async_action(
  556. send_message(Config, #{}),
  557. #{?snk_kind := buffer_worker_flush_ack},
  558. 2_000
  559. ),
  560. ?assertMatch(
  561. #{
  562. result :=
  563. {error,
  564. {unrecoverable_error, {error, error, <<"23502">>, not_null_violation, _, _}}}
  565. },
  566. Event
  567. ),
  568. ok.
  569. t_bad_sql_parameter(Config) ->
  570. QueryMode = ?config(query_mode, Config),
  571. EnableBatch = ?config(enable_batch, Config),
  572. ?assertMatch(
  573. {ok, _},
  574. create_bridge(Config)
  575. ),
  576. Request = {sql, <<"">>, [bad_parameter]},
  577. Result =
  578. case QueryMode of
  579. sync ->
  580. query_resource(Config, Request);
  581. async ->
  582. {_, Ref} = query_resource_async(Config, Request),
  583. {ok, Res} = receive_result(Ref, 2_000),
  584. Res
  585. end,
  586. case EnableBatch of
  587. true ->
  588. ?assertEqual({error, {unrecoverable_error, invalid_request}}, Result);
  589. false ->
  590. ?assertMatch(
  591. {error, {unrecoverable_error, _}}, Result
  592. )
  593. end,
  594. ok.
  595. t_nasty_sql_string(Config) ->
  596. ?assertMatch({ok, _}, create_bridge(Config)),
  597. Payload = list_to_binary(lists:seq(1, 127)),
  598. Message = #{payload => Payload, timestamp => erlang:system_time(millisecond)},
  599. {_, {ok, _}} =
  600. ?wait_async_action(
  601. send_message(Config, Message),
  602. #{?snk_kind := pgsql_connector_query_return},
  603. 1_000
  604. ),
  605. ?assertEqual(Payload, connect_and_get_payload(Config)).
  606. t_missing_table(Config) ->
  607. Name = ?config(pgsql_name, Config),
  608. BridgeType = ?config(pgsql_bridge_type, Config),
  609. ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
  610. ?check_trace(
  611. begin
  612. connect_and_drop_table(Config),
  613. ?assertMatch({ok, _}, create_bridge(Config)),
  614. ?retry(
  615. _Sleep = 1_000,
  616. _Attempts = 20,
  617. ?assertMatch(
  618. {ok, Status} when Status == connecting orelse Status == disconnected,
  619. emqx_resource_manager:health_check(ResourceID)
  620. )
  621. ),
  622. Val = integer_to_binary(erlang:unique_integer()),
  623. SentData = #{payload => Val, timestamp => 1668602148000},
  624. Timeout = 1000,
  625. ?assertMatch(
  626. {error, {resource_error, #{reason := unhealthy_target}}},
  627. query_resource(Config, {send_message, SentData, [], Timeout})
  628. ),
  629. ok
  630. end,
  631. fun(Trace) ->
  632. ?assertMatch([_], ?of_kind(pgsql_undefined_table, Trace)),
  633. ok
  634. end
  635. ),
  636. connect_and_create_table(Config),
  637. ok.
  638. t_table_removed(Config) ->
  639. Name = ?config(pgsql_name, Config),
  640. BridgeType = ?config(pgsql_bridge_type, Config),
  641. ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
  642. ?check_trace(
  643. begin
  644. connect_and_create_table(Config),
  645. ?assertMatch({ok, _}, create_bridge(Config)),
  646. ?retry(
  647. _Sleep = 1_000,
  648. _Attempts = 20,
  649. ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID))
  650. ),
  651. connect_and_drop_table(Config),
  652. Val = integer_to_binary(erlang:unique_integer()),
  653. SentData = #{payload => Val, timestamp => 1668602148000},
  654. case query_resource_sync(Config, {send_message, SentData, []}) of
  655. {error, {unrecoverable_error, {error, error, <<"42P01">>, undefined_table, _, _}}} ->
  656. ok;
  657. ?RESOURCE_ERROR_M(not_connected, _) ->
  658. ok;
  659. Res ->
  660. ct:fail("unexpected result: ~p", [Res])
  661. end,
  662. ok
  663. end,
  664. []
  665. ),
  666. connect_and_create_table(Config),
  667. ok.
  668. t_concurrent_health_checks(Config) ->
  669. Name = ?config(pgsql_name, Config),
  670. BridgeType = ?config(pgsql_bridge_type, Config),
  671. ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
  672. ?check_trace(
  673. begin
  674. connect_and_create_table(Config),
  675. ?assertMatch({ok, _}, create_bridge(Config)),
  676. ?retry(
  677. _Sleep = 1_000,
  678. _Attempts = 20,
  679. ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID))
  680. ),
  681. emqx_utils:pmap(
  682. fun(_) ->
  683. ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID))
  684. end,
  685. lists:seq(1, 20)
  686. ),
  687. ok
  688. end,
  689. fun(Trace) ->
  690. ?assertEqual([], ?of_kind(postgres_connector_bad_parse2, Trace)),
  691. ok
  692. end
  693. ),
  694. ok.