emqx_bridge_pgsql_SUITE.erl 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%--------------------------------------------------------------------
  4. -module(emqx_bridge_pgsql_SUITE).
  5. -compile(nowarn_export_all).
  6. -compile(export_all).
  7. -include_lib("eunit/include/eunit.hrl").
  8. -include_lib("common_test/include/ct.hrl").
  9. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  10. % SQL definitions
  11. -define(SQL_BRIDGE,
  12. "INSERT INTO mqtt_test(payload, arrived) "
  13. "VALUES (${payload}, TO_TIMESTAMP((${timestamp} :: bigint)/1000))"
  14. ).
  15. -define(SQL_CREATE_TABLE,
  16. "CREATE TABLE IF NOT EXISTS mqtt_test (payload text, arrived timestamp NOT NULL) "
  17. ).
  18. -define(SQL_DROP_TABLE, "DROP TABLE mqtt_test").
  19. -define(SQL_DELETE, "DELETE from mqtt_test").
  20. -define(SQL_SELECT, "SELECT payload FROM mqtt_test").
  21. % DB defaults
  22. -define(PGSQL_DATABASE, "mqtt").
  23. -define(PGSQL_USERNAME, "root").
  24. -define(PGSQL_PASSWORD, "public").
  25. -define(BATCH_SIZE, 10).
  26. %%------------------------------------------------------------------------------
  27. %% CT boilerplate
  28. %%------------------------------------------------------------------------------
  29. all() ->
  30. [
  31. {group, tcp},
  32. {group, tls}
  33. ].
  34. groups() ->
  35. TCs = emqx_common_test_helpers:all(?MODULE),
  36. NonBatchCases = [t_write_timeout],
  37. BatchVariantGroups = [
  38. {group, with_batch},
  39. {group, without_batch},
  40. {group, matrix},
  41. {group, timescale}
  42. ],
  43. QueryModeGroups = [{async, BatchVariantGroups}, {sync, BatchVariantGroups}],
  44. [
  45. {tcp, QueryModeGroups},
  46. {tls, QueryModeGroups},
  47. {async, BatchVariantGroups},
  48. {sync, BatchVariantGroups},
  49. {with_batch, TCs -- NonBatchCases},
  50. {without_batch, TCs},
  51. {matrix, [t_setup_via_config_and_publish, t_setup_via_http_api_and_publish]},
  52. {timescale, [t_setup_via_config_and_publish, t_setup_via_http_api_and_publish]}
  53. ].
  54. init_per_group(tcp, Config) ->
  55. Host = os:getenv("PGSQL_TCP_HOST", "toxiproxy"),
  56. Port = list_to_integer(os:getenv("PGSQL_TCP_PORT", "5432")),
  57. [
  58. {pgsql_host, Host},
  59. {pgsql_port, Port},
  60. {enable_tls, false},
  61. {proxy_name, "pgsql_tcp"}
  62. | Config
  63. ];
  64. init_per_group(tls, Config) ->
  65. Host = os:getenv("PGSQL_TLS_HOST", "toxiproxy"),
  66. Port = list_to_integer(os:getenv("PGSQL_TLS_PORT", "5433")),
  67. [
  68. {pgsql_host, Host},
  69. {pgsql_port, Port},
  70. {enable_tls, true},
  71. {proxy_name, "pgsql_tls"}
  72. | Config
  73. ];
  74. init_per_group(async, Config) ->
  75. [{query_mode, async} | Config];
  76. init_per_group(sync, Config) ->
  77. [{query_mode, sync} | Config];
  78. init_per_group(with_batch, Config0) ->
  79. Config = [{enable_batch, true} | Config0],
  80. common_init(Config);
  81. init_per_group(without_batch, Config0) ->
  82. Config = [{enable_batch, false} | Config0],
  83. common_init(Config);
  84. init_per_group(matrix, Config0) ->
  85. Config = [{bridge_type, <<"matrix">>}, {enable_batch, true} | Config0],
  86. common_init(Config);
  87. init_per_group(timescale, Config0) ->
  88. Config = [{bridge_type, <<"timescale">>}, {enable_batch, true} | Config0],
  89. common_init(Config);
  90. init_per_group(_Group, Config) ->
  91. Config.
  92. end_per_group(Group, Config) when Group =:= with_batch; Group =:= without_batch ->
  93. connect_and_drop_table(Config),
  94. ProxyHost = ?config(proxy_host, Config),
  95. ProxyPort = ?config(proxy_port, Config),
  96. emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
  97. ok;
  98. end_per_group(_Group, _Config) ->
  99. ok.
  100. init_per_suite(Config) ->
  101. Config.
  102. end_per_suite(_Config) ->
  103. emqx_mgmt_api_test_util:end_suite(),
  104. ok = emqx_common_test_helpers:stop_apps([emqx_bridge, emqx_conf]),
  105. ok.
  106. init_per_testcase(_Testcase, Config) ->
  107. connect_and_clear_table(Config),
  108. delete_bridge(Config),
  109. snabbkaffe:start_trace(),
  110. Config.
  111. end_per_testcase(_Testcase, Config) ->
  112. ProxyHost = ?config(proxy_host, Config),
  113. ProxyPort = ?config(proxy_port, Config),
  114. emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
  115. connect_and_clear_table(Config),
  116. ok = snabbkaffe:stop(),
  117. delete_bridge(Config),
  118. ok.
  119. %%------------------------------------------------------------------------------
  120. %% Helper fns
  121. %%------------------------------------------------------------------------------
  122. common_init(Config0) ->
  123. BridgeType = proplists:get_value(bridge_type, Config0, <<"pgsql">>),
  124. Host = ?config(pgsql_host, Config0),
  125. Port = ?config(pgsql_port, Config0),
  126. case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of
  127. true ->
  128. % Setup toxiproxy
  129. ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
  130. ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
  131. emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
  132. % Ensure enterprise bridge module is loaded
  133. ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]),
  134. _ = emqx_bridge_enterprise:module_info(),
  135. emqx_mgmt_api_test_util:init_suite(),
  136. % Connect to pgsql directly and create the table
  137. connect_and_create_table(Config0),
  138. {Name, PGConf} = pgsql_config(BridgeType, Config0),
  139. Config =
  140. [
  141. {pgsql_config, PGConf},
  142. {pgsql_bridge_type, BridgeType},
  143. {pgsql_name, Name},
  144. {proxy_host, ProxyHost},
  145. {proxy_port, ProxyPort}
  146. | Config0
  147. ],
  148. Config;
  149. false ->
  150. case os:getenv("IS_CI") of
  151. "yes" ->
  152. throw(no_pgsql);
  153. _ ->
  154. {skip, no_pgsql}
  155. end
  156. end.
  157. pgsql_config(BridgeType, Config) ->
  158. Port = integer_to_list(?config(pgsql_port, Config)),
  159. Server = ?config(pgsql_host, Config) ++ ":" ++ Port,
  160. Name = atom_to_binary(?MODULE),
  161. BatchSize =
  162. case ?config(enable_batch, Config) of
  163. true -> ?BATCH_SIZE;
  164. false -> 1
  165. end,
  166. QueryMode = ?config(query_mode, Config),
  167. TlsEnabled = ?config(enable_tls, Config),
  168. ConfigString =
  169. io_lib:format(
  170. "bridges.~s.~s {\n"
  171. " enable = true\n"
  172. " server = ~p\n"
  173. " database = ~p\n"
  174. " username = ~p\n"
  175. " password = ~p\n"
  176. " sql = ~p\n"
  177. " resource_opts = {\n"
  178. " request_ttl = 500ms\n"
  179. " batch_size = ~b\n"
  180. " query_mode = ~s\n"
  181. " }\n"
  182. " ssl = {\n"
  183. " enable = ~w\n"
  184. " }\n"
  185. "}",
  186. [
  187. BridgeType,
  188. Name,
  189. Server,
  190. ?PGSQL_DATABASE,
  191. ?PGSQL_USERNAME,
  192. ?PGSQL_PASSWORD,
  193. ?SQL_BRIDGE,
  194. BatchSize,
  195. QueryMode,
  196. TlsEnabled
  197. ]
  198. ),
  199. {Name, parse_and_check(ConfigString, BridgeType, Name)}.
  200. parse_and_check(ConfigString, BridgeType, Name) ->
  201. {ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
  202. hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
  203. #{<<"bridges">> := #{BridgeType := #{Name := Config}}} = RawConf,
  204. Config.
  205. create_bridge(Config) ->
  206. create_bridge(Config, _Overrides = #{}).
  207. create_bridge(Config, Overrides) ->
  208. BridgeType = ?config(pgsql_bridge_type, Config),
  209. Name = ?config(pgsql_name, Config),
  210. PGConfig0 = ?config(pgsql_config, Config),
  211. PGConfig = emqx_utils_maps:deep_merge(PGConfig0, Overrides),
  212. emqx_bridge:create(BridgeType, Name, PGConfig).
  213. delete_bridge(Config) ->
  214. BridgeType = ?config(pgsql_bridge_type, Config),
  215. Name = ?config(pgsql_name, Config),
  216. emqx_bridge:remove(BridgeType, Name).
  217. create_bridge_http(Params) ->
  218. Path = emqx_mgmt_api_test_util:api_path(["bridges"]),
  219. AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
  220. case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of
  221. {ok, Res} -> {ok, emqx_utils_json:decode(Res, [return_maps])};
  222. Error -> Error
  223. end.
  224. send_message(Config, Payload) ->
  225. Name = ?config(pgsql_name, Config),
  226. BridgeType = ?config(pgsql_bridge_type, Config),
  227. BridgeID = emqx_bridge_resource:bridge_id(BridgeType, Name),
  228. emqx_bridge:send_message(BridgeID, Payload).
  229. query_resource(Config, Request) ->
  230. Name = ?config(pgsql_name, Config),
  231. BridgeType = ?config(pgsql_bridge_type, Config),
  232. ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
  233. emqx_resource:query(ResourceID, Request, #{timeout => 1_000}).
  234. query_resource_sync(Config, Request) ->
  235. Name = ?config(pgsql_name, Config),
  236. BridgeType = ?config(pgsql_bridge_type, Config),
  237. ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
  238. emqx_resource_buffer_worker:simple_sync_query(ResourceID, Request).
  239. query_resource_async(Config, Request) ->
  240. query_resource_async(Config, Request, _Opts = #{}).
  241. query_resource_async(Config, Request, Opts) ->
  242. Name = ?config(pgsql_name, Config),
  243. BridgeType = ?config(pgsql_bridge_type, Config),
  244. Ref = alias([reply]),
  245. AsyncReplyFun = fun(Result) -> Ref ! {result, Ref, Result} end,
  246. ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
  247. Timeout = maps:get(timeout, Opts, 500),
  248. Return = emqx_resource:query(ResourceID, Request, #{
  249. timeout => Timeout,
  250. async_reply_fun => {AsyncReplyFun, []}
  251. }),
  252. {Return, Ref}.
  253. receive_result(Ref, Timeout) ->
  254. receive
  255. {result, Ref, Result} ->
  256. {ok, Result};
  257. {Ref, Result} ->
  258. {ok, Result}
  259. after Timeout ->
  260. timeout
  261. end.
  262. connect_direct_pgsql(Config) ->
  263. Opts = #{
  264. host => ?config(pgsql_host, Config),
  265. port => ?config(pgsql_port, Config),
  266. username => ?PGSQL_USERNAME,
  267. password => ?PGSQL_PASSWORD,
  268. database => ?PGSQL_DATABASE
  269. },
  270. SslOpts =
  271. case ?config(enable_tls, Config) of
  272. true ->
  273. Opts#{
  274. ssl => true,
  275. ssl_opts => emqx_tls_lib:to_client_opts(#{enable => true})
  276. };
  277. false ->
  278. Opts
  279. end,
  280. {ok, Con} = epgsql:connect(SslOpts),
  281. Con.
  282. % These funs connect and then stop the pgsql connection
  283. connect_and_create_table(Config) ->
  284. Con = connect_direct_pgsql(Config),
  285. {ok, _, _} = epgsql:squery(Con, ?SQL_CREATE_TABLE),
  286. ok = epgsql:close(Con).
  287. connect_and_drop_table(Config) ->
  288. Con = connect_direct_pgsql(Config),
  289. {ok, _, _} = epgsql:squery(Con, ?SQL_DROP_TABLE),
  290. ok = epgsql:close(Con).
  291. connect_and_clear_table(Config) ->
  292. Con = connect_direct_pgsql(Config),
  293. {ok, _} = epgsql:squery(Con, ?SQL_DELETE),
  294. ok = epgsql:close(Con).
  295. connect_and_get_payload(Config) ->
  296. Con = connect_direct_pgsql(Config),
  297. {ok, _, [{Result}]} = epgsql:squery(Con, ?SQL_SELECT),
  298. ok = epgsql:close(Con),
  299. Result.
  300. %%------------------------------------------------------------------------------
  301. %% Testcases
  302. %%------------------------------------------------------------------------------
  303. t_setup_via_config_and_publish(Config) ->
  304. ?assertMatch(
  305. {ok, _},
  306. create_bridge(Config)
  307. ),
  308. Val = integer_to_binary(erlang:unique_integer()),
  309. SentData = #{payload => Val, timestamp => 1668602148000},
  310. ?check_trace(
  311. begin
  312. {_, {ok, _}} =
  313. ?wait_async_action(
  314. send_message(Config, SentData),
  315. #{?snk_kind := pgsql_connector_query_return},
  316. 10_000
  317. ),
  318. ?assertMatch(
  319. Val,
  320. connect_and_get_payload(Config)
  321. ),
  322. ok
  323. end,
  324. fun(Trace0) ->
  325. Trace = ?of_kind(pgsql_connector_query_return, Trace0),
  326. case ?config(enable_batch, Config) of
  327. true ->
  328. ?assertMatch([#{result := {_, [{ok, 1}]}}], Trace);
  329. false ->
  330. ?assertMatch([#{result := {ok, 1}}], Trace)
  331. end,
  332. ok
  333. end
  334. ),
  335. ok.
  336. t_setup_via_http_api_and_publish(Config) ->
  337. BridgeType = ?config(pgsql_bridge_type, Config),
  338. Name = ?config(pgsql_name, Config),
  339. PgsqlConfig0 = ?config(pgsql_config, Config),
  340. QueryMode = ?config(query_mode, Config),
  341. PgsqlConfig = PgsqlConfig0#{
  342. <<"name">> => Name,
  343. <<"type">> => BridgeType
  344. },
  345. ?assertMatch(
  346. {ok, _},
  347. create_bridge_http(PgsqlConfig)
  348. ),
  349. Val = integer_to_binary(erlang:unique_integer()),
  350. SentData = #{payload => Val, timestamp => 1668602148000},
  351. ?check_trace(
  352. begin
  353. {Res, {ok, _}} =
  354. ?wait_async_action(
  355. send_message(Config, SentData),
  356. #{?snk_kind := pgsql_connector_query_return},
  357. 10_000
  358. ),
  359. case QueryMode of
  360. async ->
  361. ok;
  362. sync ->
  363. ?assertEqual({ok, 1}, Res)
  364. end,
  365. ?assertMatch(
  366. Val,
  367. connect_and_get_payload(Config)
  368. ),
  369. ok
  370. end,
  371. fun(Trace0) ->
  372. Trace = ?of_kind(pgsql_connector_query_return, Trace0),
  373. case ?config(enable_batch, Config) of
  374. true ->
  375. ?assertMatch([#{result := {_, [{ok, 1}]}}], Trace);
  376. false ->
  377. ?assertMatch([#{result := {ok, 1}}], Trace)
  378. end,
  379. ok
  380. end
  381. ),
  382. ok.
  383. t_get_status(Config) ->
  384. ?assertMatch(
  385. {ok, _},
  386. create_bridge(Config)
  387. ),
  388. ProxyPort = ?config(proxy_port, Config),
  389. ProxyHost = ?config(proxy_host, Config),
  390. ProxyName = ?config(proxy_name, Config),
  391. Name = ?config(pgsql_name, Config),
  392. BridgeType = ?config(pgsql_bridge_type, Config),
  393. ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
  394. ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID)),
  395. emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
  396. ?assertMatch(
  397. {ok, Status} when Status =:= disconnected orelse Status =:= connecting,
  398. emqx_resource_manager:health_check(ResourceID)
  399. )
  400. end),
  401. ok.
  402. t_create_disconnected(Config) ->
  403. ProxyPort = ?config(proxy_port, Config),
  404. ProxyHost = ?config(proxy_host, Config),
  405. ProxyName = ?config(proxy_name, Config),
  406. ?check_trace(
  407. emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
  408. ?assertMatch({ok, _}, create_bridge(Config))
  409. end),
  410. fun(Trace) ->
  411. ?assertMatch(
  412. [#{error := {start_pool_failed, _, _}}],
  413. ?of_kind(pgsql_connector_start_failed, Trace)
  414. ),
  415. ok
  416. end
  417. ),
  418. ok.
  419. t_write_failure(Config) ->
  420. ProxyName = ?config(proxy_name, Config),
  421. ProxyPort = ?config(proxy_port, Config),
  422. ProxyHost = ?config(proxy_host, Config),
  423. QueryMode = ?config(query_mode, Config),
  424. {ok, _} = create_bridge(Config),
  425. Val = integer_to_binary(erlang:unique_integer()),
  426. SentData = #{payload => Val, timestamp => 1668602148000},
  427. ?check_trace(
  428. emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
  429. {_, {ok, _}} =
  430. ?wait_async_action(
  431. case QueryMode of
  432. sync ->
  433. ?assertMatch({error, _}, send_message(Config, SentData));
  434. async ->
  435. send_message(Config, SentData)
  436. end,
  437. #{?snk_kind := buffer_worker_flush_nack},
  438. 1_000
  439. )
  440. end),
  441. fun(Trace0) ->
  442. ct:pal("trace: ~p", [Trace0]),
  443. Trace = ?of_kind(buffer_worker_flush_nack, Trace0),
  444. ?assertMatch([#{result := {error, _}} | _], Trace),
  445. [#{result := {error, Error}} | _] = Trace,
  446. case Error of
  447. {resource_error, _} ->
  448. ok;
  449. {recoverable_error, disconnected} ->
  450. ok;
  451. _ ->
  452. ct:fail("unexpected error: ~p", [Error])
  453. end
  454. end
  455. ),
  456. ok.
  457. % This test doesn't work with batch enabled since it is not possible
  458. % to set the timeout directly for batch queries
  459. t_write_timeout(Config) ->
  460. ProxyName = ?config(proxy_name, Config),
  461. ProxyPort = ?config(proxy_port, Config),
  462. ProxyHost = ?config(proxy_host, Config),
  463. QueryMode = ?config(query_mode, Config),
  464. {ok, _} = create_bridge(
  465. Config,
  466. #{
  467. <<"resource_opts">> => #{
  468. <<"resume_interval">> => <<"100ms">>,
  469. <<"health_check_interval">> => <<"100ms">>
  470. }
  471. }
  472. ),
  473. Val = integer_to_binary(erlang:unique_integer()),
  474. SentData = #{payload => Val, timestamp => 1668602148000},
  475. {ok, SRef} = snabbkaffe:subscribe(
  476. ?match_event(#{?snk_kind := call_query_enter}),
  477. 2_000
  478. ),
  479. Res0 =
  480. emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() ->
  481. Res1 =
  482. case QueryMode of
  483. async ->
  484. query_resource_async(Config, {send_message, SentData}, #{timeout => 60_000});
  485. sync ->
  486. query_resource(Config, {send_message, SentData})
  487. end,
  488. ?assertMatch({ok, [_]}, snabbkaffe:receive_events(SRef)),
  489. Res1
  490. end),
  491. case Res0 of
  492. {_, Ref} when is_reference(Ref) ->
  493. case receive_result(Ref, 15_000) of
  494. {ok, Res} ->
  495. %% we may receive a successful result depending on
  496. %% timing, if the request is retried after the
  497. %% failure is healed.
  498. case Res of
  499. {error, {unrecoverable_error, _}} ->
  500. ok;
  501. {ok, _} ->
  502. ok;
  503. _ ->
  504. ct:fail("unexpected result: ~p", [Res])
  505. end;
  506. timeout ->
  507. ct:pal("mailbox:\n ~p", [process_info(self(), messages)]),
  508. ct:fail("no response received")
  509. end;
  510. _ ->
  511. ?assertMatch({error, {resource_error, #{reason := timeout}}}, Res0)
  512. end,
  513. ok.
  514. t_simple_sql_query(Config) ->
  515. EnableBatch = ?config(enable_batch, Config),
  516. QueryMode = ?config(query_mode, Config),
  517. ?assertMatch(
  518. {ok, _},
  519. create_bridge(Config)
  520. ),
  521. Request = {sql, <<"SELECT count(1) AS T">>},
  522. Result =
  523. case QueryMode of
  524. sync ->
  525. query_resource(Config, Request);
  526. async ->
  527. {_, Ref} = query_resource_async(Config, Request),
  528. {ok, Res} = receive_result(Ref, 2_000),
  529. Res
  530. end,
  531. case EnableBatch of
  532. true ->
  533. ?assertEqual({error, {unrecoverable_error, batch_prepare_not_implemented}}, Result);
  534. false ->
  535. ?assertMatch({ok, _, [{1}]}, Result)
  536. end,
  537. ok.
  538. t_missing_data(Config) ->
  539. ?assertMatch(
  540. {ok, _},
  541. create_bridge(Config)
  542. ),
  543. {_, {ok, Event}} =
  544. ?wait_async_action(
  545. send_message(Config, #{}),
  546. #{?snk_kind := buffer_worker_flush_ack},
  547. 2_000
  548. ),
  549. ?assertMatch(
  550. #{
  551. result :=
  552. {error,
  553. {unrecoverable_error, {error, error, <<"23502">>, not_null_violation, _, _}}}
  554. },
  555. Event
  556. ),
  557. ok.
  558. t_bad_sql_parameter(Config) ->
  559. QueryMode = ?config(query_mode, Config),
  560. EnableBatch = ?config(enable_batch, Config),
  561. ?assertMatch(
  562. {ok, _},
  563. create_bridge(Config)
  564. ),
  565. Request = {sql, <<"">>, [bad_parameter]},
  566. Result =
  567. case QueryMode of
  568. sync ->
  569. query_resource(Config, Request);
  570. async ->
  571. {_, Ref} = query_resource_async(Config, Request),
  572. {ok, Res} = receive_result(Ref, 2_000),
  573. Res
  574. end,
  575. case EnableBatch of
  576. true ->
  577. ?assertEqual({error, {unrecoverable_error, invalid_request}}, Result);
  578. false ->
  579. ?assertMatch(
  580. {error, {unrecoverable_error, _}}, Result
  581. )
  582. end,
  583. ok.
  584. t_nasty_sql_string(Config) ->
  585. ?assertMatch({ok, _}, create_bridge(Config)),
  586. Payload = list_to_binary(lists:seq(1, 127)),
  587. Message = #{payload => Payload, timestamp => erlang:system_time(millisecond)},
  588. {_, {ok, _}} =
  589. ?wait_async_action(
  590. send_message(Config, Message),
  591. #{?snk_kind := pgsql_connector_query_return},
  592. 1_000
  593. ),
  594. ?assertEqual(Payload, connect_and_get_payload(Config)).
  595. t_missing_table(Config) ->
  596. Name = ?config(pgsql_name, Config),
  597. BridgeType = ?config(pgsql_bridge_type, Config),
  598. ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
  599. ?check_trace(
  600. begin
  601. connect_and_drop_table(Config),
  602. ?assertMatch({ok, _}, create_bridge(Config)),
  603. ?retry(
  604. _Sleep = 1_000,
  605. _Attempts = 20,
  606. ?assertMatch(
  607. {ok, Status} when Status == connecting orelse Status == disconnected,
  608. emqx_resource_manager:health_check(ResourceID)
  609. )
  610. ),
  611. Val = integer_to_binary(erlang:unique_integer()),
  612. SentData = #{payload => Val, timestamp => 1668602148000},
  613. Timeout = 1000,
  614. ?assertMatch(
  615. {error, {resource_error, #{reason := unhealthy_target}}},
  616. query_resource(Config, {send_message, SentData, [], Timeout})
  617. ),
  618. ok
  619. end,
  620. fun(Trace) ->
  621. ?assertMatch([_, _, _], ?of_kind(pgsql_undefined_table, Trace)),
  622. ok
  623. end
  624. ),
  625. connect_and_create_table(Config),
  626. ok.
  627. t_table_removed(Config) ->
  628. Name = ?config(pgsql_name, Config),
  629. BridgeType = ?config(pgsql_bridge_type, Config),
  630. ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
  631. ?check_trace(
  632. begin
  633. connect_and_create_table(Config),
  634. ?assertMatch({ok, _}, create_bridge(Config)),
  635. ?retry(
  636. _Sleep = 1_000,
  637. _Attempts = 20,
  638. ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID))
  639. ),
  640. connect_and_drop_table(Config),
  641. Val = integer_to_binary(erlang:unique_integer()),
  642. SentData = #{payload => Val, timestamp => 1668602148000},
  643. ?assertMatch(
  644. {error, {unrecoverable_error, {error, error, <<"42P01">>, undefined_table, _, _}}},
  645. query_resource_sync(Config, {send_message, SentData, []})
  646. ),
  647. ok
  648. end,
  649. []
  650. ),
  651. connect_and_create_table(Config),
  652. ok.