emqx_bridge_pgsql_SUITE.erl 26 KB


  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%--------------------------------------------------------------------
  4. -module(emqx_bridge_pgsql_SUITE).
  5. -compile(nowarn_export_all).
  6. -compile(export_all).
  7. -include_lib("eunit/include/eunit.hrl").
  8. -include_lib("common_test/include/ct.hrl").
  9. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  10. -include("emqx_resource_errors.hrl").
  11. % SQL definitions
  12. -define(SQL_BRIDGE,
  13. "INSERT INTO mqtt_test(payload, arrived) "
  14. "VALUES (${payload}, TO_TIMESTAMP((${timestamp} :: bigint)/1000))"
  15. ).
  16. -define(SQL_CREATE_TABLE,
  17. "CREATE TABLE IF NOT EXISTS mqtt_test (payload text, arrived timestamp NOT NULL) "
  18. ).
  19. -define(SQL_DROP_TABLE, "DROP TABLE mqtt_test").
  20. -define(SQL_DELETE, "DELETE from mqtt_test").
  21. -define(SQL_SELECT, "SELECT payload FROM mqtt_test").
  22. % DB defaults
  23. -define(PGSQL_DATABASE, "mqtt").
  24. -define(PGSQL_USERNAME, "root").
  25. -define(PGSQL_PASSWORD, "public").
  26. -define(BATCH_SIZE, 10).
  27. %%------------------------------------------------------------------------------
  28. %% CT boilerplate
  29. %%------------------------------------------------------------------------------
  30. all() ->
  31. [
  32. {group, tcp},
  33. {group, tls}
  34. ].
  35. groups() ->
  36. TCs = emqx_common_test_helpers:all(?MODULE),
  37. NonBatchCases = [t_write_timeout],
  38. BatchVariantGroups = [
  39. {group, with_batch},
  40. {group, without_batch},
  41. {group, matrix},
  42. {group, timescale}
  43. ],
  44. QueryModeGroups = [{async, BatchVariantGroups}, {sync, BatchVariantGroups}],
  45. [
  46. {tcp, QueryModeGroups},
  47. {tls, QueryModeGroups},
  48. {async, BatchVariantGroups},
  49. {sync, BatchVariantGroups},
  50. {with_batch, TCs -- NonBatchCases},
  51. {without_batch, TCs},
  52. {matrix, [t_setup_via_config_and_publish, t_setup_via_http_api_and_publish]},
  53. {timescale, [t_setup_via_config_and_publish, t_setup_via_http_api_and_publish]}
  54. ].
  55. init_per_group(tcp, Config) ->
  56. Host = os:getenv("PGSQL_TCP_HOST", "toxiproxy"),
  57. Port = list_to_integer(os:getenv("PGSQL_TCP_PORT", "5432")),
  58. [
  59. {pgsql_host, Host},
  60. {pgsql_port, Port},
  61. {enable_tls, false},
  62. {proxy_name, "pgsql_tcp"}
  63. | Config
  64. ];
  65. init_per_group(tls, Config) ->
  66. Host = os:getenv("PGSQL_TLS_HOST", "toxiproxy"),
  67. Port = list_to_integer(os:getenv("PGSQL_TLS_PORT", "5433")),
  68. [
  69. {pgsql_host, Host},
  70. {pgsql_port, Port},
  71. {enable_tls, true},
  72. {proxy_name, "pgsql_tls"}
  73. | Config
  74. ];
  75. init_per_group(async, Config) ->
  76. [{query_mode, async} | Config];
  77. init_per_group(sync, Config) ->
  78. [{query_mode, sync} | Config];
  79. init_per_group(with_batch, Config0) ->
  80. Config = [{enable_batch, true} | Config0],
  81. common_init(Config);
  82. init_per_group(without_batch, Config0) ->
  83. Config = [{enable_batch, false} | Config0],
  84. common_init(Config);
  85. init_per_group(matrix, Config0) ->
  86. Config = [{bridge_type, <<"matrix">>}, {enable_batch, true} | Config0],
  87. common_init(Config);
  88. init_per_group(timescale, Config0) ->
  89. Config = [{bridge_type, <<"timescale">>}, {enable_batch, true} | Config0],
  90. common_init(Config);
  91. init_per_group(_Group, Config) ->
  92. Config.
  93. end_per_group(Group, Config) when
  94. Group =:= with_batch;
  95. Group =:= without_batch;
  96. Group =:= matrix;
  97. Group =:= timescale
  98. ->
  99. Apps = ?config(apps, Config),
  100. ProxyHost = ?config(proxy_host, Config),
  101. ProxyPort = ?config(proxy_port, Config),
  102. connect_and_drop_table(Config),
  103. emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
  104. ok = emqx_cth_suite:stop(Apps),
  105. ok;
  106. end_per_group(_Group, _Config) ->
  107. ok.
  108. init_per_suite(Config) ->
  109. Config.
  110. end_per_suite(_Config) ->
  111. ok.
  112. init_per_testcase(_Testcase, Config) ->
  113. connect_and_clear_table(Config),
  114. delete_bridge(Config),
  115. snabbkaffe:start_trace(),
  116. Config.
  117. end_per_testcase(_Testcase, Config) ->
  118. ProxyHost = ?config(proxy_host, Config),
  119. ProxyPort = ?config(proxy_port, Config),
  120. emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
  121. connect_and_clear_table(Config),
  122. ok = snabbkaffe:stop(),
  123. delete_bridge(Config),
  124. ok.
  125. %%------------------------------------------------------------------------------
  126. %% Helper fns
  127. %%------------------------------------------------------------------------------
  128. common_init(Config0) ->
  129. BridgeType = proplists:get_value(bridge_type, Config0, <<"pgsql">>),
  130. Host = ?config(pgsql_host, Config0),
  131. Port = ?config(pgsql_port, Config0),
  132. case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of
  133. true ->
  134. % Setup toxiproxy
  135. ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
  136. ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
  137. emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
  138. % Ensure enterprise bridge module is loaded
  139. Apps = emqx_cth_suite:start(
  140. [
  141. emqx,
  142. emqx_conf,
  143. emqx_connector,
  144. emqx_bridge,
  145. emqx_bridge_pgsql,
  146. emqx_rule_engine,
  147. emqx_management,
  148. {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
  149. ],
  150. #{work_dir => emqx_cth_suite:work_dir(Config0)}
  151. ),
  152. {ok, _Api} = emqx_common_test_http:create_default_app(),
  153. %% ok = emqx_common_test_helpers:start_apps([emqx, emqx_postgresql, emqx_conf, emqx_bridge]),
  154. %% _ = emqx_bridge_enterprise:module_info(),
  155. %% emqx_mgmt_api_test_util:init_suite(),
  156. % Connect to pgsql directly and create the table
  157. connect_and_create_table(Config0),
  158. {Name, PGConf} = pgsql_config(BridgeType, Config0),
  159. Config =
  160. [
  161. {apps, Apps},
  162. {pgsql_config, PGConf},
  163. {pgsql_bridge_type, BridgeType},
  164. {pgsql_name, Name},
  165. {proxy_host, ProxyHost},
  166. {proxy_port, ProxyPort}
  167. | Config0
  168. ],
  169. Config;
  170. false ->
  171. case os:getenv("IS_CI") of
  172. "yes" ->
  173. throw(no_pgsql);
  174. _ ->
  175. {skip, no_pgsql}
  176. end
  177. end.
  178. pgsql_config(BridgeType, Config) ->
  179. Port = integer_to_list(?config(pgsql_port, Config)),
  180. Server = ?config(pgsql_host, Config) ++ ":" ++ Port,
  181. Name = atom_to_binary(?MODULE),
  182. BatchSize =
  183. case ?config(enable_batch, Config) of
  184. true -> ?BATCH_SIZE;
  185. false -> 1
  186. end,
  187. QueryMode = ?config(query_mode, Config),
  188. TlsEnabled = ?config(enable_tls, Config),
  189. %% NOTE: supplying password through a file here, to verify that it works.
  190. Password = create_passfile(BridgeType, Config),
  191. ConfigString =
  192. io_lib:format(
  193. "bridges.~s.~s {"
  194. "\n enable = true"
  195. "\n server = ~p"
  196. "\n database = ~p"
  197. "\n username = ~p"
  198. "\n password = ~p"
  199. "\n sql = ~p"
  200. "\n resource_opts = {"
  201. "\n request_ttl = 500ms"
  202. "\n batch_size = ~b"
  203. "\n query_mode = ~s"
  204. "\n worker_pool_size = 1"
  205. "\n health_check_interval = 15s"
  206. "\n start_after_created = true"
  207. "\n start_timeout = 5s"
  208. "\n inflight_window = 100"
  209. "\n max_buffer_bytes = 256MB"
  210. "\n buffer_seg_bytes = 10MB"
  211. "\n buffer_mode = memory_only"
  212. "\n metrics_flush_interval = 5s"
  213. "\n resume_interval = 15s"
  214. "\n }"
  215. "\n ssl = {"
  216. "\n enable = ~w"
  217. "\n }"
  218. "\n }",
  219. [
  220. BridgeType,
  221. Name,
  222. Server,
  223. ?PGSQL_DATABASE,
  224. ?PGSQL_USERNAME,
  225. Password,
  226. ?SQL_BRIDGE,
  227. BatchSize,
  228. QueryMode,
  229. TlsEnabled
  230. ]
  231. ),
  232. {Name, parse_and_check(ConfigString, BridgeType, Name)}.
  233. default_sql() ->
  234. ?SQL_BRIDGE.
  235. create_passfile(BridgeType, Config) ->
  236. Filename = binary_to_list(BridgeType) ++ ".passfile",
  237. Filepath = filename:join(?config(priv_dir, Config), Filename),
  238. ok = file:write_file(Filepath, ?PGSQL_PASSWORD),
  239. "file://" ++ Filepath.
  240. parse_and_check(ConfigString, BridgeType, Name) ->
  241. {ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
  242. hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
  243. #{<<"bridges">> := #{BridgeType := #{Name := Config}}} = RawConf,
  244. Config.
  245. create_bridge(Config) ->
  246. create_bridge(Config, _Overrides = #{}).
  247. create_bridge(Config, Overrides) ->
  248. BridgeType = ?config(pgsql_bridge_type, Config),
  249. Name = ?config(pgsql_name, Config),
  250. PGConfig0 = ?config(pgsql_config, Config),
  251. PGConfig = emqx_utils_maps:deep_merge(PGConfig0, Overrides),
  252. emqx_bridge:create(BridgeType, Name, PGConfig).
  253. delete_bridge(Config) ->
  254. BridgeType = ?config(pgsql_bridge_type, Config),
  255. Name = ?config(pgsql_name, Config),
  256. emqx_bridge:remove(BridgeType, Name).
  257. create_bridge_http(Params) ->
  258. Path = emqx_mgmt_api_test_util:api_path(["bridges"]),
  259. AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
  260. case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of
  261. {ok, Res} -> {ok, emqx_utils_json:decode(Res, [return_maps])};
  262. Error -> Error
  263. end.
  264. send_message(Config, Payload) ->
  265. Name = ?config(pgsql_name, Config),
  266. BridgeType = ?config(pgsql_bridge_type, Config),
  267. BridgeID = emqx_bridge_resource:bridge_id(BridgeType, Name),
  268. emqx_bridge:send_message(BridgeID, Payload).
  269. query_resource(Config, Msg = _Request) ->
  270. Name = ?config(pgsql_name, Config),
  271. BridgeType = ?config(pgsql_bridge_type, Config),
  272. emqx_bridge_v2:query(BridgeType, Name, Msg, #{timeout => 1_000}).
  273. query_resource_sync(Config, Request) ->
  274. Name = ?config(pgsql_name, Config),
  275. BridgeType = ?config(pgsql_bridge_type, Config),
  276. ActionId = emqx_bridge_v2:id(BridgeType, Name),
  277. emqx_resource_buffer_worker:simple_sync_query(ActionId, Request).
  278. query_resource_async(Config, Request) ->
  279. query_resource_async(Config, Request, _Opts = #{}).
  280. query_resource_async(Config, Request, Opts) ->
  281. Name = ?config(pgsql_name, Config),
  282. BridgeType = ?config(pgsql_bridge_type, Config),
  283. Ref = alias([reply]),
  284. AsyncReplyFun = fun(Result) -> Ref ! {result, Ref, Result} end,
  285. Timeout = maps:get(timeout, Opts, 500),
  286. Return = emqx_bridge_v2:query(BridgeType, Name, Request, #{
  287. timeout => Timeout,
  288. async_reply_fun => {AsyncReplyFun, []}
  289. }),
  290. {Return, Ref}.
  291. receive_result(Ref, Timeout) ->
  292. receive
  293. {result, Ref, Result} ->
  294. {ok, Result};
  295. {Ref, Result} ->
  296. {ok, Result}
  297. after Timeout ->
  298. timeout
  299. end.
  300. connect_direct_pgsql(Config) ->
  301. Opts = #{
  302. host => ?config(pgsql_host, Config),
  303. port => ?config(pgsql_port, Config),
  304. username => ?PGSQL_USERNAME,
  305. password => ?PGSQL_PASSWORD,
  306. database => ?PGSQL_DATABASE
  307. },
  308. SslOpts =
  309. case ?config(enable_tls, Config) of
  310. true ->
  311. Opts#{
  312. ssl => true,
  313. ssl_opts => emqx_tls_lib:to_client_opts(#{enable => true})
  314. };
  315. false ->
  316. Opts
  317. end,
  318. {ok, Con} = epgsql:connect(SslOpts),
  319. Con.
  320. % These funs connect and then stop the pgsql connection
  321. connect_and_create_table(Config) ->
  322. Con = connect_direct_pgsql(Config),
  323. {ok, _, _} = epgsql:squery(Con, ?SQL_CREATE_TABLE),
  324. ok = epgsql:close(Con).
  325. connect_and_drop_table(Config) ->
  326. Con = connect_direct_pgsql(Config),
  327. {ok, _, _} = epgsql:squery(Con, ?SQL_DROP_TABLE),
  328. ok = epgsql:close(Con).
  329. connect_and_clear_table(Config) ->
  330. Con = connect_direct_pgsql(Config),
  331. _ = epgsql:squery(Con, ?SQL_CREATE_TABLE),
  332. {ok, _} = epgsql:squery(Con, ?SQL_DELETE),
  333. ok = epgsql:close(Con).
  334. connect_and_get_payload(Config) ->
  335. Con = connect_direct_pgsql(Config),
  336. {ok, _, [{Result}]} = epgsql:squery(Con, ?SQL_SELECT),
  337. ok = epgsql:close(Con),
  338. Result.
  339. %%------------------------------------------------------------------------------
  340. %% Testcases
  341. %%------------------------------------------------------------------------------
  342. t_setup_via_config_and_publish(Config) ->
  343. ?assertMatch(
  344. {ok, _},
  345. create_bridge(Config)
  346. ),
  347. Val = integer_to_binary(erlang:unique_integer()),
  348. SentData = #{payload => Val, timestamp => 1668602148000},
  349. ?check_trace(
  350. begin
  351. {_, {ok, _}} =
  352. ?wait_async_action(
  353. send_message(Config, SentData),
  354. #{?snk_kind := pgsql_connector_query_return},
  355. 10_000
  356. ),
  357. ?assertMatch(
  358. Val,
  359. connect_and_get_payload(Config)
  360. ),
  361. ok
  362. end,
  363. fun(Trace0) ->
  364. Trace = ?of_kind(pgsql_connector_query_return, Trace0),
  365. case ?config(enable_batch, Config) of
  366. true ->
  367. ?assertMatch([#{result := {_, [{ok, 1}]}}], Trace);
  368. false ->
  369. ?assertMatch([#{result := {ok, 1}}], Trace)
  370. end,
  371. ok
  372. end
  373. ),
  374. ok.
  375. t_setup_via_http_api_and_publish(Config) ->
  376. BridgeType = ?config(pgsql_bridge_type, Config),
  377. Name = ?config(pgsql_name, Config),
  378. PgsqlConfig0 = ?config(pgsql_config, Config),
  379. QueryMode = ?config(query_mode, Config),
  380. PgsqlConfig = PgsqlConfig0#{
  381. <<"name">> => Name,
  382. <<"type">> => BridgeType,
  383. %% NOTE: using literal passwords with HTTP API requests.
  384. <<"password">> => <<?PGSQL_PASSWORD>>
  385. },
  386. ?assertMatch(
  387. {ok, _},
  388. create_bridge_http(PgsqlConfig)
  389. ),
  390. Val = integer_to_binary(erlang:unique_integer()),
  391. SentData = #{payload => Val, timestamp => 1668602148000},
  392. ?check_trace(
  393. begin
  394. {Res, {ok, _}} =
  395. ?wait_async_action(
  396. send_message(Config, SentData),
  397. #{?snk_kind := pgsql_connector_query_return},
  398. 10_000
  399. ),
  400. case QueryMode of
  401. async ->
  402. ok;
  403. sync ->
  404. ?assertEqual({ok, 1}, Res)
  405. end,
  406. ?assertMatch(
  407. Val,
  408. connect_and_get_payload(Config)
  409. ),
  410. ok
  411. end,
  412. fun(Trace0) ->
  413. Trace = ?of_kind(pgsql_connector_query_return, Trace0),
  414. case ?config(enable_batch, Config) of
  415. true ->
  416. ?assertMatch([#{result := {_, [{ok, 1}]}}], Trace);
  417. false ->
  418. ?assertMatch([#{result := {ok, 1}}], Trace)
  419. end,
  420. ok
  421. end
  422. ),
  423. ok.
  424. t_get_status(Config) ->
  425. ?assertMatch(
  426. {ok, _},
  427. create_bridge(Config)
  428. ),
  429. ProxyPort = ?config(proxy_port, Config),
  430. ProxyHost = ?config(proxy_host, Config),
  431. ProxyName = ?config(proxy_name, Config),
  432. Name = ?config(pgsql_name, Config),
  433. BridgeType = ?config(pgsql_bridge_type, Config),
  434. ?assertMatch(#{status := connected}, emqx_bridge_v2:health_check(BridgeType, Name)),
  435. emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
  436. ?assertMatch(
  437. #{status := Status} when Status =:= disconnected orelse Status =:= connecting,
  438. emqx_bridge_v2:health_check(BridgeType, Name)
  439. )
  440. end),
  441. ok.
  442. t_create_disconnected(Config) ->
  443. ProxyPort = ?config(proxy_port, Config),
  444. ProxyHost = ?config(proxy_host, Config),
  445. ProxyName = ?config(proxy_name, Config),
  446. ?check_trace(
  447. emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
  448. ?assertMatch({ok, _}, create_bridge(Config))
  449. end),
  450. fun(Trace) ->
  451. ?assertMatch(
  452. [#{error := {start_pool_failed, _, _}}],
  453. ?of_kind(pgsql_connector_start_failed, Trace)
  454. ),
  455. ok
  456. end
  457. ),
  458. ok.
  459. t_write_failure(Config) ->
  460. ProxyName = ?config(proxy_name, Config),
  461. ProxyPort = ?config(proxy_port, Config),
  462. ProxyHost = ?config(proxy_host, Config),
  463. QueryMode = ?config(query_mode, Config),
  464. {ok, _} = create_bridge(Config),
  465. Val = integer_to_binary(erlang:unique_integer()),
  466. SentData = #{payload => Val, timestamp => 1668602148000},
  467. ?check_trace(
  468. emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
  469. {_, {ok, _}} =
  470. ?wait_async_action(
  471. case QueryMode of
  472. sync ->
  473. ?assertMatch({error, _}, send_message(Config, SentData));
  474. async ->
  475. send_message(Config, SentData)
  476. end,
  477. #{?snk_kind := buffer_worker_flush_nack},
  478. 15_000
  479. )
  480. end),
  481. fun(Trace0) ->
  482. ct:pal("trace: ~p", [Trace0]),
  483. Trace = ?of_kind(buffer_worker_flush_nack, Trace0),
  484. ?assertMatch([#{result := {error, _}} | _], Trace),
  485. [#{result := {error, Error}} | _] = Trace,
  486. case Error of
  487. {resource_error, _} ->
  488. ok;
  489. {recoverable_error, disconnected} ->
  490. ok;
  491. _ ->
  492. ct:fail("unexpected error: ~p", [Error])
  493. end
  494. end
  495. ),
  496. ok.
  497. % This test doesn't work with batch enabled since it is not possible
  498. % to set the timeout directly for batch queries
  499. t_write_timeout(Config) ->
  500. ProxyName = ?config(proxy_name, Config),
  501. ProxyPort = ?config(proxy_port, Config),
  502. ProxyHost = ?config(proxy_host, Config),
  503. QueryMode = ?config(query_mode, Config),
  504. {ok, _} = create_bridge(
  505. Config,
  506. #{
  507. <<"resource_opts">> => #{
  508. <<"resume_interval">> => <<"100ms">>,
  509. <<"health_check_interval">> => <<"100ms">>
  510. }
  511. }
  512. ),
  513. Val = integer_to_binary(erlang:unique_integer()),
  514. SentData = #{payload => Val, timestamp => 1668602148000},
  515. {ok, SRef} = snabbkaffe:subscribe(
  516. ?match_event(#{?snk_kind := call_query_enter}),
  517. 2_000
  518. ),
  519. Res0 =
  520. emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() ->
  521. Res1 =
  522. case QueryMode of
  523. async ->
  524. query_resource_async(Config, {send_message, SentData}, #{timeout => 60_000});
  525. sync ->
  526. query_resource(Config, {send_message, SentData})
  527. end,
  528. ?assertMatch({ok, [_]}, snabbkaffe:receive_events(SRef)),
  529. Res1
  530. end),
  531. case Res0 of
  532. {_, Ref} when is_reference(Ref) ->
  533. case receive_result(Ref, 15_000) of
  534. {ok, Res} ->
  535. %% we may receive a successful result depending on
  536. %% timing, if the request is retried after the
  537. %% failure is healed.
  538. case Res of
  539. {error, {unrecoverable_error, _}} ->
  540. ok;
  541. {ok, _} ->
  542. ok;
  543. _ ->
  544. ct:fail("unexpected result: ~p", [Res])
  545. end;
  546. timeout ->
  547. ct:pal("mailbox:\n ~p", [process_info(self(), messages)]),
  548. ct:fail("no response received")
  549. end;
  550. _ ->
  551. ?assertMatch({error, {resource_error, #{reason := timeout}}}, Res0)
  552. end,
  553. ok.
  554. t_simple_sql_query(Config) ->
  555. EnableBatch = ?config(enable_batch, Config),
  556. QueryMode = ?config(query_mode, Config),
  557. ?assertMatch(
  558. {ok, _},
  559. create_bridge(Config)
  560. ),
  561. Request = {sql, <<"SELECT count(1) AS T">>},
  562. Result =
  563. case QueryMode of
  564. sync ->
  565. query_resource(Config, Request);
  566. async ->
  567. {_, Ref} = query_resource_async(Config, Request),
  568. {ok, Res} = receive_result(Ref, 2_000),
  569. Res
  570. end,
  571. case EnableBatch of
  572. true ->
  573. ?assertEqual({error, {unrecoverable_error, batch_prepare_not_implemented}}, Result);
  574. false ->
  575. ?assertMatch({ok, _, [{1}]}, Result)
  576. end,
  577. ok.
  578. t_missing_data(Config) ->
  579. ?assertMatch(
  580. {ok, _},
  581. create_bridge(Config)
  582. ),
  583. {_, {ok, Event}} =
  584. ?wait_async_action(
  585. send_message(Config, #{}),
  586. #{?snk_kind := buffer_worker_flush_ack},
  587. 2_000
  588. ),
  589. ?assertMatch(
  590. #{
  591. result :=
  592. {error,
  593. {unrecoverable_error, #{
  594. error_code := <<"23502">>,
  595. error_codename := not_null_violation,
  596. severity := error
  597. }}}
  598. },
  599. Event
  600. ),
  601. ok.
  602. t_bad_sql_parameter(Config) ->
  603. QueryMode = ?config(query_mode, Config),
  604. EnableBatch = ?config(enable_batch, Config),
  605. ?assertMatch(
  606. {ok, _},
  607. create_bridge(Config)
  608. ),
  609. Request = {sql, <<"">>, [bad_parameter]},
  610. Result =
  611. case QueryMode of
  612. sync ->
  613. query_resource(Config, Request);
  614. async ->
  615. {_, Ref} = query_resource_async(Config, Request),
  616. {ok, Res} = receive_result(Ref, 2_000),
  617. Res
  618. end,
  619. case EnableBatch of
  620. true ->
  621. ?assertEqual({error, {unrecoverable_error, invalid_request}}, Result);
  622. false ->
  623. ?assertMatch(
  624. {error, {unrecoverable_error, _}}, Result
  625. )
  626. end,
  627. ok.
  628. t_nasty_sql_string(Config) ->
  629. ?assertMatch({ok, _}, create_bridge(Config)),
  630. Payload = list_to_binary(lists:seq(1, 127)),
  631. Message = #{payload => Payload, timestamp => erlang:system_time(millisecond)},
  632. {_, {ok, _}} =
  633. ?wait_async_action(
  634. send_message(Config, Message),
  635. #{?snk_kind := pgsql_connector_query_return},
  636. 1_000
  637. ),
  638. ?assertEqual(Payload, connect_and_get_payload(Config)).
  639. t_missing_table(Config) ->
  640. Name = ?config(pgsql_name, Config),
  641. BridgeType = ?config(pgsql_bridge_type, Config),
  642. % ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
  643. ?check_trace(
  644. begin
  645. connect_and_drop_table(Config),
  646. ?assertMatch({ok, _}, create_bridge(Config)),
  647. ?retry(
  648. _Sleep = 1_000,
  649. _Attempts = 20,
  650. ?assertMatch(
  651. #{status := Status} when Status == connecting orelse Status == disconnected,
  652. emqx_bridge_v2:health_check(BridgeType, Name)
  653. )
  654. ),
  655. Val = integer_to_binary(erlang:unique_integer()),
  656. SentData = #{payload => Val, timestamp => 1668602148000},
  657. ?assertMatch(
  658. {error, {resource_error, #{reason := unhealthy_target}}},
  659. query_resource(Config, {send_message, SentData})
  660. ),
  661. ok
  662. end,
  663. fun(Trace) ->
  664. ?assertMatch([_ | _], ?of_kind(pgsql_undefined_table, Trace)),
  665. ok
  666. end
  667. ),
  668. connect_and_create_table(Config),
  669. ok.
  670. t_table_removed(Config) ->
  671. Name = ?config(pgsql_name, Config),
  672. BridgeType = ?config(pgsql_bridge_type, Config),
  673. ?check_trace(
  674. begin
  675. connect_and_create_table(Config),
  676. ?assertMatch({ok, _}, create_bridge(Config)),
  677. ?retry(
  678. _Sleep = 100,
  679. _Attempts = 200,
  680. ?assertMatch(#{status := connected}, emqx_bridge_v2:health_check(BridgeType, Name))
  681. ),
  682. connect_and_drop_table(Config),
  683. Val = integer_to_binary(erlang:unique_integer()),
  684. SentData = #{payload => Val, timestamp => 1668602148000},
  685. ActionId = emqx_bridge_v2:id(BridgeType, Name),
  686. case query_resource_sync(Config, {ActionId, SentData}) of
  687. {error, {unrecoverable_error, _}} ->
  688. ok;
  689. ?RESOURCE_ERROR_M(not_connected, _) ->
  690. ok;
  691. Res ->
  692. ct:fail("unexpected result: ~p", [Res])
  693. end,
  694. ok
  695. end,
  696. []
  697. ),
  698. connect_and_create_table(Config),
  699. ok.
  700. t_concurrent_health_checks(Config) ->
  701. Name = ?config(pgsql_name, Config),
  702. BridgeType = ?config(pgsql_bridge_type, Config),
  703. ?check_trace(
  704. begin
  705. connect_and_create_table(Config),
  706. ?assertMatch({ok, _}, create_bridge(Config)),
  707. ?retry(
  708. _Sleep = 1_000,
  709. _Attempts = 20,
  710. ?assertMatch(#{status := connected}, emqx_bridge_v2:health_check(BridgeType, Name))
  711. ),
  712. emqx_utils:pmap(
  713. fun(_) ->
  714. ?assertMatch(
  715. #{status := connected}, emqx_bridge_v2:health_check(BridgeType, Name)
  716. )
  717. end,
  718. lists:seq(1, 20)
  719. ),
  720. ok
  721. end,
  722. fun(Trace) ->
  723. ?assertEqual([], ?of_kind(postgres_connector_bad_parse2, Trace)),
  724. ok
  725. end
  726. ),
  727. ok.