emqx_bridge_pgsql_SUITE.erl 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%--------------------------------------------------------------------
  4. -module(emqx_bridge_pgsql_SUITE).
  5. -compile(nowarn_export_all).
  6. -compile(export_all).
  7. -include_lib("eunit/include/eunit.hrl").
  8. -include_lib("common_test/include/ct.hrl").
  9. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  10. -include("emqx_resource_errors.hrl").
  11. % SQL definitions
  12. -define(SQL_BRIDGE,
  13. "INSERT INTO mqtt_test(payload, arrived) "
  14. "VALUES (${payload}, TO_TIMESTAMP((${timestamp} :: bigint)/1000))"
  15. ).
  16. -define(SQL_CREATE_TABLE,
  17. "CREATE TABLE IF NOT EXISTS mqtt_test (payload text, arrived timestamp NOT NULL) "
  18. ).
  19. -define(SQL_DROP_TABLE, "DROP TABLE mqtt_test").
  20. -define(SQL_DELETE, "DELETE from mqtt_test").
  21. -define(SQL_SELECT, "SELECT payload FROM mqtt_test").
  22. % DB defaults
  23. -define(PGSQL_DATABASE, "mqtt").
  24. -define(PGSQL_USERNAME, "root").
  25. -define(PGSQL_PASSWORD, "public").
  26. -define(BATCH_SIZE, 10).
  27. %%------------------------------------------------------------------------------
  28. %% CT boilerplate
  29. %%------------------------------------------------------------------------------
  30. all() ->
  31. [
  32. {group, tcp},
  33. {group, tls}
  34. ].
  35. groups() ->
  36. TCs = emqx_common_test_helpers:all(?MODULE),
  37. NonBatchCases = [t_write_timeout],
  38. BatchVariantGroups = [
  39. {group, with_batch},
  40. {group, without_batch},
  41. {group, matrix},
  42. {group, timescale}
  43. ],
  44. QueryModeGroups = [{async, BatchVariantGroups}, {sync, BatchVariantGroups}],
  45. [
  46. {tcp, QueryModeGroups},
  47. {tls, QueryModeGroups},
  48. {async, BatchVariantGroups},
  49. {sync, BatchVariantGroups},
  50. {with_batch, TCs -- NonBatchCases},
  51. {without_batch, TCs},
  52. {matrix, [t_setup_via_config_and_publish, t_setup_via_http_api_and_publish]},
  53. {timescale, [t_setup_via_config_and_publish, t_setup_via_http_api_and_publish]}
  54. ].
  55. init_per_group(tcp, Config) ->
  56. Host = os:getenv("PGSQL_TCP_HOST", "toxiproxy"),
  57. Port = list_to_integer(os:getenv("PGSQL_TCP_PORT", "5432")),
  58. [
  59. {pgsql_host, Host},
  60. {pgsql_port, Port},
  61. {enable_tls, false},
  62. {proxy_name, "pgsql_tcp"}
  63. | Config
  64. ];
  65. init_per_group(tls, Config) ->
  66. Host = os:getenv("PGSQL_TLS_HOST", "toxiproxy"),
  67. Port = list_to_integer(os:getenv("PGSQL_TLS_PORT", "5433")),
  68. [
  69. {pgsql_host, Host},
  70. {pgsql_port, Port},
  71. {enable_tls, true},
  72. {proxy_name, "pgsql_tls"}
  73. | Config
  74. ];
  75. init_per_group(async, Config) ->
  76. [{query_mode, async} | Config];
  77. init_per_group(sync, Config) ->
  78. [{query_mode, sync} | Config];
  79. init_per_group(with_batch, Config0) ->
  80. Config = [{enable_batch, true} | Config0],
  81. common_init(Config);
  82. init_per_group(without_batch, Config0) ->
  83. Config = [{enable_batch, false} | Config0],
  84. common_init(Config);
  85. init_per_group(matrix, Config0) ->
  86. Config = [{bridge_type, <<"matrix">>}, {enable_batch, true} | Config0],
  87. common_init(Config);
  88. init_per_group(timescale, Config0) ->
  89. Config = [{bridge_type, <<"timescale">>}, {enable_batch, true} | Config0],
  90. common_init(Config);
  91. init_per_group(_Group, Config) ->
  92. Config.
  93. end_per_group(Group, Config) when Group =:= with_batch; Group =:= without_batch ->
  94. connect_and_drop_table(Config),
  95. ProxyHost = ?config(proxy_host, Config),
  96. ProxyPort = ?config(proxy_port, Config),
  97. emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
  98. ok;
  99. end_per_group(_Group, _Config) ->
  100. ok.
  101. init_per_suite(Config) ->
  102. Config.
  103. end_per_suite(_Config) ->
  104. emqx_mgmt_api_test_util:end_suite(),
  105. ok = emqx_common_test_helpers:stop_apps([emqx_bridge, emqx_conf]),
  106. ok.
  107. init_per_testcase(_Testcase, Config) ->
  108. connect_and_clear_table(Config),
  109. delete_bridge(Config),
  110. snabbkaffe:start_trace(),
  111. Config.
  112. end_per_testcase(_Testcase, Config) ->
  113. ProxyHost = ?config(proxy_host, Config),
  114. ProxyPort = ?config(proxy_port, Config),
  115. emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
  116. connect_and_clear_table(Config),
  117. ok = snabbkaffe:stop(),
  118. delete_bridge(Config),
  119. ok.
  120. %%------------------------------------------------------------------------------
  121. %% Helper fns
  122. %%------------------------------------------------------------------------------
  123. common_init(Config0) ->
  124. BridgeType = proplists:get_value(bridge_type, Config0, <<"pgsql">>),
  125. Host = ?config(pgsql_host, Config0),
  126. Port = ?config(pgsql_port, Config0),
  127. case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of
  128. true ->
  129. % Setup toxiproxy
  130. ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
  131. ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
  132. emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
  133. % Ensure enterprise bridge module is loaded
  134. ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]),
  135. _ = emqx_bridge_enterprise:module_info(),
  136. emqx_mgmt_api_test_util:init_suite(),
  137. % Connect to pgsql directly and create the table
  138. connect_and_create_table(Config0),
  139. {Name, PGConf} = pgsql_config(BridgeType, Config0),
  140. Config =
  141. [
  142. {pgsql_config, PGConf},
  143. {pgsql_bridge_type, BridgeType},
  144. {pgsql_name, Name},
  145. {proxy_host, ProxyHost},
  146. {proxy_port, ProxyPort}
  147. | Config0
  148. ],
  149. Config;
  150. false ->
  151. case os:getenv("IS_CI") of
  152. "yes" ->
  153. throw(no_pgsql);
  154. _ ->
  155. {skip, no_pgsql}
  156. end
  157. end.
  158. pgsql_config(BridgeType, Config) ->
  159. Port = integer_to_list(?config(pgsql_port, Config)),
  160. Server = ?config(pgsql_host, Config) ++ ":" ++ Port,
  161. Name = atom_to_binary(?MODULE),
  162. BatchSize =
  163. case ?config(enable_batch, Config) of
  164. true -> ?BATCH_SIZE;
  165. false -> 1
  166. end,
  167. QueryMode = ?config(query_mode, Config),
  168. TlsEnabled = ?config(enable_tls, Config),
  169. ConfigString =
  170. io_lib:format(
  171. "bridges.~s.~s {\n"
  172. " enable = true\n"
  173. " server = ~p\n"
  174. " database = ~p\n"
  175. " username = ~p\n"
  176. " password = ~p\n"
  177. " sql = ~p\n"
  178. " resource_opts = {\n"
  179. " request_ttl = 500ms\n"
  180. " batch_size = ~b\n"
  181. " query_mode = ~s\n"
  182. " }\n"
  183. " ssl = {\n"
  184. " enable = ~w\n"
  185. " }\n"
  186. "}",
  187. [
  188. BridgeType,
  189. Name,
  190. Server,
  191. ?PGSQL_DATABASE,
  192. ?PGSQL_USERNAME,
  193. ?PGSQL_PASSWORD,
  194. ?SQL_BRIDGE,
  195. BatchSize,
  196. QueryMode,
  197. TlsEnabled
  198. ]
  199. ),
  200. {Name, parse_and_check(ConfigString, BridgeType, Name)}.
  201. parse_and_check(ConfigString, BridgeType, Name) ->
  202. {ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
  203. hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
  204. #{<<"bridges">> := #{BridgeType := #{Name := Config}}} = RawConf,
  205. Config.
  206. create_bridge(Config) ->
  207. create_bridge(Config, _Overrides = #{}).
  208. create_bridge(Config, Overrides) ->
  209. BridgeType = ?config(pgsql_bridge_type, Config),
  210. Name = ?config(pgsql_name, Config),
  211. PGConfig0 = ?config(pgsql_config, Config),
  212. PGConfig = emqx_utils_maps:deep_merge(PGConfig0, Overrides),
  213. emqx_bridge:create(BridgeType, Name, PGConfig).
  214. delete_bridge(Config) ->
  215. BridgeType = ?config(pgsql_bridge_type, Config),
  216. Name = ?config(pgsql_name, Config),
  217. emqx_bridge:remove(BridgeType, Name).
  218. create_bridge_http(Params) ->
  219. Path = emqx_mgmt_api_test_util:api_path(["bridges"]),
  220. AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
  221. case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of
  222. {ok, Res} -> {ok, emqx_utils_json:decode(Res, [return_maps])};
  223. Error -> Error
  224. end.
  225. send_message(Config, Payload) ->
  226. Name = ?config(pgsql_name, Config),
  227. BridgeType = ?config(pgsql_bridge_type, Config),
  228. BridgeID = emqx_bridge_resource:bridge_id(BridgeType, Name),
  229. emqx_bridge:send_message(BridgeID, Payload).
  230. query_resource(Config, Request) ->
  231. Name = ?config(pgsql_name, Config),
  232. BridgeType = ?config(pgsql_bridge_type, Config),
  233. ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
  234. emqx_resource:query(ResourceID, Request, #{timeout => 1_000}).
  235. query_resource_sync(Config, Request) ->
  236. Name = ?config(pgsql_name, Config),
  237. BridgeType = ?config(pgsql_bridge_type, Config),
  238. ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
  239. emqx_resource_buffer_worker:simple_sync_query(ResourceID, Request).
  240. query_resource_async(Config, Request) ->
  241. query_resource_async(Config, Request, _Opts = #{}).
  242. query_resource_async(Config, Request, Opts) ->
  243. Name = ?config(pgsql_name, Config),
  244. BridgeType = ?config(pgsql_bridge_type, Config),
  245. Ref = alias([reply]),
  246. AsyncReplyFun = fun(Result) -> Ref ! {result, Ref, Result} end,
  247. ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
  248. Timeout = maps:get(timeout, Opts, 500),
  249. Return = emqx_resource:query(ResourceID, Request, #{
  250. timeout => Timeout,
  251. async_reply_fun => {AsyncReplyFun, []}
  252. }),
  253. {Return, Ref}.
  254. receive_result(Ref, Timeout) ->
  255. receive
  256. {result, Ref, Result} ->
  257. {ok, Result};
  258. {Ref, Result} ->
  259. {ok, Result}
  260. after Timeout ->
  261. timeout
  262. end.
  263. connect_direct_pgsql(Config) ->
  264. Opts = #{
  265. host => ?config(pgsql_host, Config),
  266. port => ?config(pgsql_port, Config),
  267. username => ?PGSQL_USERNAME,
  268. password => ?PGSQL_PASSWORD,
  269. database => ?PGSQL_DATABASE
  270. },
  271. SslOpts =
  272. case ?config(enable_tls, Config) of
  273. true ->
  274. Opts#{
  275. ssl => true,
  276. ssl_opts => emqx_tls_lib:to_client_opts(#{enable => true})
  277. };
  278. false ->
  279. Opts
  280. end,
  281. {ok, Con} = epgsql:connect(SslOpts),
  282. Con.
  283. % These funs connect and then stop the pgsql connection
  284. connect_and_create_table(Config) ->
  285. Con = connect_direct_pgsql(Config),
  286. {ok, _, _} = epgsql:squery(Con, ?SQL_CREATE_TABLE),
  287. ok = epgsql:close(Con).
  288. connect_and_drop_table(Config) ->
  289. Con = connect_direct_pgsql(Config),
  290. {ok, _, _} = epgsql:squery(Con, ?SQL_DROP_TABLE),
  291. ok = epgsql:close(Con).
  292. connect_and_clear_table(Config) ->
  293. Con = connect_direct_pgsql(Config),
  294. {ok, _} = epgsql:squery(Con, ?SQL_DELETE),
  295. ok = epgsql:close(Con).
  296. connect_and_get_payload(Config) ->
  297. Con = connect_direct_pgsql(Config),
  298. {ok, _, [{Result}]} = epgsql:squery(Con, ?SQL_SELECT),
  299. ok = epgsql:close(Con),
  300. Result.
  301. %%------------------------------------------------------------------------------
  302. %% Testcases
  303. %%------------------------------------------------------------------------------
  304. t_setup_via_config_and_publish(Config) ->
  305. ?assertMatch(
  306. {ok, _},
  307. create_bridge(Config)
  308. ),
  309. Val = integer_to_binary(erlang:unique_integer()),
  310. SentData = #{payload => Val, timestamp => 1668602148000},
  311. ?check_trace(
  312. begin
  313. {_, {ok, _}} =
  314. ?wait_async_action(
  315. send_message(Config, SentData),
  316. #{?snk_kind := pgsql_connector_query_return},
  317. 10_000
  318. ),
  319. ?assertMatch(
  320. Val,
  321. connect_and_get_payload(Config)
  322. ),
  323. ok
  324. end,
  325. fun(Trace0) ->
  326. Trace = ?of_kind(pgsql_connector_query_return, Trace0),
  327. case ?config(enable_batch, Config) of
  328. true ->
  329. ?assertMatch([#{result := {_, [{ok, 1}]}}], Trace);
  330. false ->
  331. ?assertMatch([#{result := {ok, 1}}], Trace)
  332. end,
  333. ok
  334. end
  335. ),
  336. ok.
  337. t_setup_via_http_api_and_publish(Config) ->
  338. BridgeType = ?config(pgsql_bridge_type, Config),
  339. Name = ?config(pgsql_name, Config),
  340. PgsqlConfig0 = ?config(pgsql_config, Config),
  341. QueryMode = ?config(query_mode, Config),
  342. PgsqlConfig = PgsqlConfig0#{
  343. <<"name">> => Name,
  344. <<"type">> => BridgeType
  345. },
  346. ?assertMatch(
  347. {ok, _},
  348. create_bridge_http(PgsqlConfig)
  349. ),
  350. Val = integer_to_binary(erlang:unique_integer()),
  351. SentData = #{payload => Val, timestamp => 1668602148000},
  352. ?check_trace(
  353. begin
  354. {Res, {ok, _}} =
  355. ?wait_async_action(
  356. send_message(Config, SentData),
  357. #{?snk_kind := pgsql_connector_query_return},
  358. 10_000
  359. ),
  360. case QueryMode of
  361. async ->
  362. ok;
  363. sync ->
  364. ?assertEqual({ok, 1}, Res)
  365. end,
  366. ?assertMatch(
  367. Val,
  368. connect_and_get_payload(Config)
  369. ),
  370. ok
  371. end,
  372. fun(Trace0) ->
  373. Trace = ?of_kind(pgsql_connector_query_return, Trace0),
  374. case ?config(enable_batch, Config) of
  375. true ->
  376. ?assertMatch([#{result := {_, [{ok, 1}]}}], Trace);
  377. false ->
  378. ?assertMatch([#{result := {ok, 1}}], Trace)
  379. end,
  380. ok
  381. end
  382. ),
  383. ok.
  384. t_get_status(Config) ->
  385. ?assertMatch(
  386. {ok, _},
  387. create_bridge(Config)
  388. ),
  389. ProxyPort = ?config(proxy_port, Config),
  390. ProxyHost = ?config(proxy_host, Config),
  391. ProxyName = ?config(proxy_name, Config),
  392. Name = ?config(pgsql_name, Config),
  393. BridgeType = ?config(pgsql_bridge_type, Config),
  394. ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
  395. ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID)),
  396. emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
  397. ?assertMatch(
  398. {ok, Status} when Status =:= disconnected orelse Status =:= connecting,
  399. emqx_resource_manager:health_check(ResourceID)
  400. )
  401. end),
  402. ok.
  403. t_create_disconnected(Config) ->
  404. ProxyPort = ?config(proxy_port, Config),
  405. ProxyHost = ?config(proxy_host, Config),
  406. ProxyName = ?config(proxy_name, Config),
  407. ?check_trace(
  408. emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
  409. ?assertMatch({ok, _}, create_bridge(Config))
  410. end),
  411. fun(Trace) ->
  412. ?assertMatch(
  413. [#{error := {start_pool_failed, _, _}}],
  414. ?of_kind(pgsql_connector_start_failed, Trace)
  415. ),
  416. ok
  417. end
  418. ),
  419. ok.
  420. t_write_failure(Config) ->
  421. ProxyName = ?config(proxy_name, Config),
  422. ProxyPort = ?config(proxy_port, Config),
  423. ProxyHost = ?config(proxy_host, Config),
  424. QueryMode = ?config(query_mode, Config),
  425. {ok, _} = create_bridge(Config),
  426. Val = integer_to_binary(erlang:unique_integer()),
  427. SentData = #{payload => Val, timestamp => 1668602148000},
  428. ?check_trace(
  429. emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
  430. {_, {ok, _}} =
  431. ?wait_async_action(
  432. case QueryMode of
  433. sync ->
  434. ?assertMatch({error, _}, send_message(Config, SentData));
  435. async ->
  436. send_message(Config, SentData)
  437. end,
  438. #{?snk_kind := buffer_worker_flush_nack},
  439. 15_000
  440. )
  441. end),
  442. fun(Trace0) ->
  443. ct:pal("trace: ~p", [Trace0]),
  444. Trace = ?of_kind(buffer_worker_flush_nack, Trace0),
  445. ?assertMatch([#{result := {error, _}} | _], Trace),
  446. [#{result := {error, Error}} | _] = Trace,
  447. case Error of
  448. {resource_error, _} ->
  449. ok;
  450. {recoverable_error, disconnected} ->
  451. ok;
  452. _ ->
  453. ct:fail("unexpected error: ~p", [Error])
  454. end
  455. end
  456. ),
  457. ok.
  458. % This test doesn't work with batch enabled since it is not possible
  459. % to set the timeout directly for batch queries
  460. t_write_timeout(Config) ->
  461. ProxyName = ?config(proxy_name, Config),
  462. ProxyPort = ?config(proxy_port, Config),
  463. ProxyHost = ?config(proxy_host, Config),
  464. QueryMode = ?config(query_mode, Config),
  465. {ok, _} = create_bridge(
  466. Config,
  467. #{
  468. <<"resource_opts">> => #{
  469. <<"resume_interval">> => <<"100ms">>,
  470. <<"health_check_interval">> => <<"100ms">>
  471. }
  472. }
  473. ),
  474. Val = integer_to_binary(erlang:unique_integer()),
  475. SentData = #{payload => Val, timestamp => 1668602148000},
  476. {ok, SRef} = snabbkaffe:subscribe(
  477. ?match_event(#{?snk_kind := call_query_enter}),
  478. 2_000
  479. ),
  480. Res0 =
  481. emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() ->
  482. Res1 =
  483. case QueryMode of
  484. async ->
  485. query_resource_async(Config, {send_message, SentData}, #{timeout => 60_000});
  486. sync ->
  487. query_resource(Config, {send_message, SentData})
  488. end,
  489. ?assertMatch({ok, [_]}, snabbkaffe:receive_events(SRef)),
  490. Res1
  491. end),
  492. case Res0 of
  493. {_, Ref} when is_reference(Ref) ->
  494. case receive_result(Ref, 15_000) of
  495. {ok, Res} ->
  496. %% we may receive a successful result depending on
  497. %% timing, if the request is retried after the
  498. %% failure is healed.
  499. case Res of
  500. {error, {unrecoverable_error, _}} ->
  501. ok;
  502. {ok, _} ->
  503. ok;
  504. _ ->
  505. ct:fail("unexpected result: ~p", [Res])
  506. end;
  507. timeout ->
  508. ct:pal("mailbox:\n ~p", [process_info(self(), messages)]),
  509. ct:fail("no response received")
  510. end;
  511. _ ->
  512. ?assertMatch({error, {resource_error, #{reason := timeout}}}, Res0)
  513. end,
  514. ok.
  515. t_simple_sql_query(Config) ->
  516. EnableBatch = ?config(enable_batch, Config),
  517. QueryMode = ?config(query_mode, Config),
  518. ?assertMatch(
  519. {ok, _},
  520. create_bridge(Config)
  521. ),
  522. Request = {sql, <<"SELECT count(1) AS T">>},
  523. Result =
  524. case QueryMode of
  525. sync ->
  526. query_resource(Config, Request);
  527. async ->
  528. {_, Ref} = query_resource_async(Config, Request),
  529. {ok, Res} = receive_result(Ref, 2_000),
  530. Res
  531. end,
  532. case EnableBatch of
  533. true ->
  534. ?assertEqual({error, {unrecoverable_error, batch_prepare_not_implemented}}, Result);
  535. false ->
  536. ?assertMatch({ok, _, [{1}]}, Result)
  537. end,
  538. ok.
  539. t_missing_data(Config) ->
  540. ?assertMatch(
  541. {ok, _},
  542. create_bridge(Config)
  543. ),
  544. {_, {ok, Event}} =
  545. ?wait_async_action(
  546. send_message(Config, #{}),
  547. #{?snk_kind := buffer_worker_flush_ack},
  548. 2_000
  549. ),
  550. ?assertMatch(
  551. #{
  552. result :=
  553. {error,
  554. {unrecoverable_error, {error, error, <<"23502">>, not_null_violation, _, _}}}
  555. },
  556. Event
  557. ),
  558. ok.
  559. t_bad_sql_parameter(Config) ->
  560. QueryMode = ?config(query_mode, Config),
  561. EnableBatch = ?config(enable_batch, Config),
  562. ?assertMatch(
  563. {ok, _},
  564. create_bridge(Config)
  565. ),
  566. Request = {sql, <<"">>, [bad_parameter]},
  567. Result =
  568. case QueryMode of
  569. sync ->
  570. query_resource(Config, Request);
  571. async ->
  572. {_, Ref} = query_resource_async(Config, Request),
  573. {ok, Res} = receive_result(Ref, 2_000),
  574. Res
  575. end,
  576. case EnableBatch of
  577. true ->
  578. ?assertEqual({error, {unrecoverable_error, invalid_request}}, Result);
  579. false ->
  580. ?assertMatch(
  581. {error, {unrecoverable_error, _}}, Result
  582. )
  583. end,
  584. ok.
  585. t_nasty_sql_string(Config) ->
  586. ?assertMatch({ok, _}, create_bridge(Config)),
  587. Payload = list_to_binary(lists:seq(1, 127)),
  588. Message = #{payload => Payload, timestamp => erlang:system_time(millisecond)},
  589. {_, {ok, _}} =
  590. ?wait_async_action(
  591. send_message(Config, Message),
  592. #{?snk_kind := pgsql_connector_query_return},
  593. 1_000
  594. ),
  595. ?assertEqual(Payload, connect_and_get_payload(Config)).
  596. t_missing_table(Config) ->
  597. Name = ?config(pgsql_name, Config),
  598. BridgeType = ?config(pgsql_bridge_type, Config),
  599. ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
  600. ?check_trace(
  601. begin
  602. connect_and_drop_table(Config),
  603. ?assertMatch({ok, _}, create_bridge(Config)),
  604. ?retry(
  605. _Sleep = 1_000,
  606. _Attempts = 20,
  607. ?assertMatch(
  608. {ok, Status} when Status == connecting orelse Status == disconnected,
  609. emqx_resource_manager:health_check(ResourceID)
  610. )
  611. ),
  612. Val = integer_to_binary(erlang:unique_integer()),
  613. SentData = #{payload => Val, timestamp => 1668602148000},
  614. Timeout = 1000,
  615. ?assertMatch(
  616. {error, {resource_error, #{reason := unhealthy_target}}},
  617. query_resource(Config, {send_message, SentData, [], Timeout})
  618. ),
  619. ok
  620. end,
  621. fun(Trace) ->
  622. ?assertMatch([_, _, _], ?of_kind(pgsql_undefined_table, Trace)),
  623. ok
  624. end
  625. ),
  626. connect_and_create_table(Config),
  627. ok.
  628. t_table_removed(Config) ->
  629. Name = ?config(pgsql_name, Config),
  630. BridgeType = ?config(pgsql_bridge_type, Config),
  631. ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
  632. ?check_trace(
  633. begin
  634. connect_and_create_table(Config),
  635. ?assertMatch({ok, _}, create_bridge(Config)),
  636. ?retry(
  637. _Sleep = 1_000,
  638. _Attempts = 20,
  639. ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID))
  640. ),
  641. connect_and_drop_table(Config),
  642. Val = integer_to_binary(erlang:unique_integer()),
  643. SentData = #{payload => Val, timestamp => 1668602148000},
  644. case query_resource_sync(Config, {send_message, SentData, []}) of
  645. {error, {unrecoverable_error, {error, error, <<"42P01">>, undefined_table, _, _}}} ->
  646. ok;
  647. ?RESOURCE_ERROR_M(not_connected, _) ->
  648. ok;
  649. Res ->
  650. ct:fail("unexpected result: ~p", [Res])
  651. end,
  652. ok
  653. end,
  654. []
  655. ),
  656. connect_and_create_table(Config),
  657. ok.
  658. t_concurrent_health_checks(Config) ->
  659. Name = ?config(pgsql_name, Config),
  660. BridgeType = ?config(pgsql_bridge_type, Config),
  661. ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
  662. ?check_trace(
  663. begin
  664. connect_and_create_table(Config),
  665. ?assertMatch({ok, _}, create_bridge(Config)),
  666. ?retry(
  667. _Sleep = 1_000,
  668. _Attempts = 20,
  669. ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID))
  670. ),
  671. emqx_utils:pmap(
  672. fun(_) ->
  673. ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID))
  674. end,
  675. lists:seq(1, 20)
  676. ),
  677. ok
  678. end,
  679. fun(Trace) ->
  680. ?assertEqual([], ?of_kind(postgres_connector_bad_parse2, Trace)),
  681. ok
  682. end
  683. ),
  684. ok.