emqx_bridge_pgsql_SUITE.erl 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%--------------------------------------------------------------------
  4. -module(emqx_bridge_pgsql_SUITE).
  5. -compile(nowarn_export_all).
  6. -compile(export_all).
  7. -include_lib("eunit/include/eunit.hrl").
  8. -include_lib("common_test/include/ct.hrl").
  9. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  10. -include("emqx_resource_errors.hrl").
  11. % SQL definitions
  12. -define(SQL_BRIDGE,
  13. "INSERT INTO mqtt_test(payload, arrived) "
  14. "VALUES (${payload}, TO_TIMESTAMP((${timestamp} :: bigint)/1000))"
  15. ).
  16. -define(SQL_CREATE_TABLE,
  17. "CREATE TABLE IF NOT EXISTS mqtt_test (payload text, arrived timestamp NOT NULL) "
  18. ).
  19. -define(SQL_DROP_TABLE, "DROP TABLE mqtt_test").
  20. -define(SQL_DELETE, "DELETE from mqtt_test").
  21. -define(SQL_SELECT, "SELECT payload FROM mqtt_test").
  22. % DB defaults
  23. -define(PGSQL_DATABASE, "mqtt").
  24. -define(PGSQL_USERNAME, "root").
  25. -define(PGSQL_PASSWORD, "public").
  26. -define(BATCH_SIZE, 10).
  27. %%------------------------------------------------------------------------------
  28. %% CT boilerplate
  29. %%------------------------------------------------------------------------------
  30. all() ->
  31. [
  32. {group, tcp},
  33. {group, tls}
  34. ].
  35. groups() ->
  36. TCs = emqx_common_test_helpers:all(?MODULE),
  37. NonBatchCases = [t_write_timeout],
  38. BatchVariantGroups = [
  39. {group, with_batch},
  40. {group, without_batch},
  41. {group, matrix},
  42. {group, timescale}
  43. ],
  44. QueryModeGroups = [{async, BatchVariantGroups}, {sync, BatchVariantGroups}],
  45. [
  46. {tcp, QueryModeGroups},
  47. {tls, QueryModeGroups},
  48. {async, BatchVariantGroups},
  49. {sync, BatchVariantGroups},
  50. {with_batch, TCs -- NonBatchCases},
  51. {without_batch, TCs},
  52. {matrix, [t_setup_via_config_and_publish, t_setup_via_http_api_and_publish]},
  53. {timescale, [t_setup_via_config_and_publish, t_setup_via_http_api_and_publish]}
  54. ].
  55. init_per_group(tcp, Config) ->
  56. Host = os:getenv("PGSQL_TCP_HOST", "toxiproxy"),
  57. Port = list_to_integer(os:getenv("PGSQL_TCP_PORT", "5432")),
  58. [
  59. {pgsql_host, Host},
  60. {pgsql_port, Port},
  61. {enable_tls, false},
  62. {proxy_name, "pgsql_tcp"}
  63. | Config
  64. ];
  65. init_per_group(tls, Config) ->
  66. Host = os:getenv("PGSQL_TLS_HOST", "toxiproxy"),
  67. Port = list_to_integer(os:getenv("PGSQL_TLS_PORT", "5433")),
  68. [
  69. {pgsql_host, Host},
  70. {pgsql_port, Port},
  71. {enable_tls, true},
  72. {proxy_name, "pgsql_tls"}
  73. | Config
  74. ];
  75. init_per_group(async, Config) ->
  76. [{query_mode, async} | Config];
  77. init_per_group(sync, Config) ->
  78. [{query_mode, sync} | Config];
  79. init_per_group(with_batch, Config0) ->
  80. Config = [{enable_batch, true} | Config0],
  81. common_init(Config);
  82. init_per_group(without_batch, Config0) ->
  83. Config = [{enable_batch, false} | Config0],
  84. common_init(Config);
  85. init_per_group(matrix, Config0) ->
  86. Config = [{bridge_type, <<"matrix">>}, {enable_batch, true} | Config0],
  87. common_init(Config);
  88. init_per_group(timescale, Config0) ->
  89. Config = [{bridge_type, <<"timescale">>}, {enable_batch, true} | Config0],
  90. common_init(Config);
  91. init_per_group(_Group, Config) ->
  92. Config.
  93. end_per_group(Group, Config) when
  94. Group =:= with_batch;
  95. Group =:= without_batch;
  96. Group =:= matrix;
  97. Group =:= timescale
  98. ->
  99. Apps = ?config(apps, Config),
  100. ProxyHost = ?config(proxy_host, Config),
  101. ProxyPort = ?config(proxy_port, Config),
  102. connect_and_drop_table(Config),
  103. emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
  104. ok = emqx_cth_suite:stop(Apps),
  105. ok;
  106. end_per_group(_Group, _Config) ->
  107. ok.
  108. init_per_suite(Config) ->
  109. Config.
  110. end_per_suite(_Config) ->
  111. ok.
  112. init_per_testcase(_Testcase, Config) ->
  113. connect_and_clear_table(Config),
  114. delete_bridge(Config),
  115. snabbkaffe:start_trace(),
  116. Config.
  117. end_per_testcase(_Testcase, Config) ->
  118. ProxyHost = ?config(proxy_host, Config),
  119. ProxyPort = ?config(proxy_port, Config),
  120. emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
  121. connect_and_clear_table(Config),
  122. ok = snabbkaffe:stop(),
  123. delete_bridge(Config),
  124. ok.
  125. %%------------------------------------------------------------------------------
  126. %% Helper fns
  127. %%------------------------------------------------------------------------------
  128. common_init(Config0) ->
  129. BridgeType = proplists:get_value(bridge_type, Config0, <<"pgsql">>),
  130. Host = ?config(pgsql_host, Config0),
  131. Port = ?config(pgsql_port, Config0),
  132. case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of
  133. true ->
  134. % Setup toxiproxy
  135. ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
  136. ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
  137. emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
  138. % Ensure enterprise bridge module is loaded
  139. Apps = emqx_cth_suite:start(
  140. [
  141. emqx,
  142. emqx_conf,
  143. emqx_connector,
  144. emqx_bridge,
  145. emqx_bridge_pgsql,
  146. emqx_rule_engine,
  147. emqx_management,
  148. {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
  149. ],
  150. #{work_dir => emqx_cth_suite:work_dir(Config0)}
  151. ),
  152. {ok, _Api} = emqx_common_test_http:create_default_app(),
  153. % Connect to pgsql directly and create the table
  154. connect_and_create_table(Config0),
  155. {Name, PGConf} = pgsql_config(BridgeType, Config0),
  156. Config =
  157. [
  158. {apps, Apps},
  159. {pgsql_config, PGConf},
  160. {pgsql_bridge_type, BridgeType},
  161. {pgsql_name, Name},
  162. {proxy_host, ProxyHost},
  163. {proxy_port, ProxyPort}
  164. | Config0
  165. ],
  166. Config;
  167. false ->
  168. case os:getenv("IS_CI") of
  169. "yes" ->
  170. throw(no_pgsql);
  171. _ ->
  172. {skip, no_pgsql}
  173. end
  174. end.
  175. pgsql_config(BridgeType, Config) ->
  176. Port = integer_to_list(?config(pgsql_port, Config)),
  177. Server = ?config(pgsql_host, Config) ++ ":" ++ Port,
  178. Name = atom_to_binary(?MODULE),
  179. BatchSize =
  180. case ?config(enable_batch, Config) of
  181. true -> ?BATCH_SIZE;
  182. false -> 1
  183. end,
  184. QueryMode = ?config(query_mode, Config),
  185. TlsEnabled = ?config(enable_tls, Config),
  186. %% NOTE: supplying password through a file here, to verify that it works.
  187. Password = create_passfile(BridgeType, Config),
  188. ConfigString =
  189. io_lib:format(
  190. "bridges.~s.~s {"
  191. "\n enable = true"
  192. "\n server = ~p"
  193. "\n database = ~p"
  194. "\n username = ~p"
  195. "\n password = ~p"
  196. "\n sql = ~p"
  197. "\n resource_opts = {"
  198. "\n request_ttl = 500ms"
  199. "\n batch_size = ~b"
  200. "\n query_mode = ~s"
  201. "\n worker_pool_size = 1"
  202. "\n health_check_interval = 15s"
  203. "\n start_after_created = true"
  204. "\n start_timeout = 5s"
  205. "\n inflight_window = 100"
  206. "\n max_buffer_bytes = 256MB"
  207. "\n buffer_seg_bytes = 10MB"
  208. "\n buffer_mode = memory_only"
  209. "\n metrics_flush_interval = 5s"
  210. "\n resume_interval = 15s"
  211. "\n }"
  212. "\n ssl = {"
  213. "\n enable = ~w"
  214. "\n }"
  215. "\n }",
  216. [
  217. BridgeType,
  218. Name,
  219. Server,
  220. ?PGSQL_DATABASE,
  221. ?PGSQL_USERNAME,
  222. Password,
  223. ?SQL_BRIDGE,
  224. BatchSize,
  225. QueryMode,
  226. TlsEnabled
  227. ]
  228. ),
  229. {Name, parse_and_check(ConfigString, BridgeType, Name)}.
  230. default_sql() ->
  231. ?SQL_BRIDGE.
  232. create_passfile(BridgeType, Config) ->
  233. Filename = binary_to_list(BridgeType) ++ ".passfile",
  234. Filepath = filename:join(?config(priv_dir, Config), Filename),
  235. ok = file:write_file(Filepath, ?PGSQL_PASSWORD),
  236. "file://" ++ Filepath.
  237. parse_and_check(ConfigString, BridgeType, Name) ->
  238. {ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
  239. hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
  240. #{<<"bridges">> := #{BridgeType := #{Name := Config}}} = RawConf,
  241. Config.
  242. create_bridge(Config) ->
  243. create_bridge(Config, _Overrides = #{}).
  244. create_bridge(Config, Overrides) ->
  245. BridgeType = ?config(pgsql_bridge_type, Config),
  246. Name = ?config(pgsql_name, Config),
  247. PGConfig0 = ?config(pgsql_config, Config),
  248. PGConfig = emqx_utils_maps:deep_merge(PGConfig0, Overrides),
  249. emqx_bridge:create(BridgeType, Name, PGConfig).
  250. delete_bridge(Config) ->
  251. BridgeType = ?config(pgsql_bridge_type, Config),
  252. Name = ?config(pgsql_name, Config),
  253. emqx_bridge:remove(BridgeType, Name).
  254. create_bridge_http(Params) ->
  255. Path = emqx_mgmt_api_test_util:api_path(["bridges"]),
  256. AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
  257. case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of
  258. {ok, Res} -> {ok, emqx_utils_json:decode(Res, [return_maps])};
  259. Error -> Error
  260. end.
  261. send_message(Config, Payload) ->
  262. Name = ?config(pgsql_name, Config),
  263. BridgeType = ?config(pgsql_bridge_type, Config),
  264. BridgeID = emqx_bridge_resource:bridge_id(BridgeType, Name),
  265. emqx_bridge:send_message(BridgeID, Payload).
  266. query_resource(Config, Msg = _Request) ->
  267. Name = ?config(pgsql_name, Config),
  268. BridgeType = ?config(pgsql_bridge_type, Config),
  269. emqx_bridge_v2:query(BridgeType, Name, Msg, #{timeout => 1_000}).
  270. query_resource_sync(Config, Request) ->
  271. Name = ?config(pgsql_name, Config),
  272. BridgeType = ?config(pgsql_bridge_type, Config),
  273. ActionId = emqx_bridge_v2:id(BridgeType, Name),
  274. emqx_resource_buffer_worker:simple_sync_query(ActionId, Request).
  275. query_resource_async(Config, Request) ->
  276. query_resource_async(Config, Request, _Opts = #{}).
  277. query_resource_async(Config, Request, Opts) ->
  278. Name = ?config(pgsql_name, Config),
  279. BridgeType = ?config(pgsql_bridge_type, Config),
  280. Ref = alias([reply]),
  281. AsyncReplyFun = fun(Result) -> Ref ! {result, Ref, Result} end,
  282. Timeout = maps:get(timeout, Opts, 500),
  283. Return = emqx_bridge_v2:query(BridgeType, Name, Request, #{
  284. timeout => Timeout,
  285. async_reply_fun => {AsyncReplyFun, []}
  286. }),
  287. {Return, Ref}.
  288. receive_result(Ref, Timeout) ->
  289. receive
  290. {result, Ref, Result} ->
  291. {ok, Result};
  292. {Ref, Result} ->
  293. {ok, Result}
  294. after Timeout ->
  295. timeout
  296. end.
  297. connect_direct_pgsql(Config) ->
  298. Opts = #{
  299. host => ?config(pgsql_host, Config),
  300. port => ?config(pgsql_port, Config),
  301. username => ?PGSQL_USERNAME,
  302. password => ?PGSQL_PASSWORD,
  303. database => ?PGSQL_DATABASE
  304. },
  305. SslOpts =
  306. case ?config(enable_tls, Config) of
  307. true ->
  308. Opts#{
  309. ssl => true,
  310. ssl_opts => emqx_tls_lib:to_client_opts(#{enable => true})
  311. };
  312. false ->
  313. Opts
  314. end,
  315. {ok, Con} = epgsql:connect(SslOpts),
  316. Con.
  317. % These funs connect and then stop the pgsql connection
  318. connect_and_create_table(Config) ->
  319. Con = connect_direct_pgsql(Config),
  320. {ok, _, _} = epgsql:squery(Con, ?SQL_CREATE_TABLE),
  321. ok = epgsql:close(Con).
  322. connect_and_drop_table(Config) ->
  323. Con = connect_direct_pgsql(Config),
  324. {ok, _, _} = epgsql:squery(Con, ?SQL_DROP_TABLE),
  325. ok = epgsql:close(Con).
  326. connect_and_clear_table(Config) ->
  327. Con = connect_direct_pgsql(Config),
  328. _ = epgsql:squery(Con, ?SQL_CREATE_TABLE),
  329. {ok, _} = epgsql:squery(Con, ?SQL_DELETE),
  330. ok = epgsql:close(Con).
  331. connect_and_get_payload(Config) ->
  332. Con = connect_direct_pgsql(Config),
  333. {ok, _, [{Result}]} = epgsql:squery(Con, ?SQL_SELECT),
  334. ok = epgsql:close(Con),
  335. Result.
  336. %%------------------------------------------------------------------------------
  337. %% Testcases
  338. %%------------------------------------------------------------------------------
  339. t_setup_via_config_and_publish(Config) ->
  340. ?assertMatch(
  341. {ok, _},
  342. create_bridge(Config)
  343. ),
  344. Val = integer_to_binary(erlang:unique_integer()),
  345. SentData = #{payload => Val, timestamp => 1668602148000},
  346. ?check_trace(
  347. begin
  348. {_, {ok, _}} =
  349. ?wait_async_action(
  350. send_message(Config, SentData),
  351. #{?snk_kind := pgsql_connector_query_return},
  352. 10_000
  353. ),
  354. ?assertMatch(
  355. Val,
  356. connect_and_get_payload(Config)
  357. ),
  358. ok
  359. end,
  360. fun(Trace0) ->
  361. Trace = ?of_kind(pgsql_connector_query_return, Trace0),
  362. case ?config(enable_batch, Config) of
  363. true ->
  364. ?assertMatch([#{result := {_, [{ok, 1}]}}], Trace);
  365. false ->
  366. ?assertMatch([#{result := {ok, 1}}], Trace)
  367. end,
  368. ok
  369. end
  370. ),
  371. ok.
  372. t_setup_via_http_api_and_publish(Config) ->
  373. BridgeType = ?config(pgsql_bridge_type, Config),
  374. Name = ?config(pgsql_name, Config),
  375. PgsqlConfig0 = ?config(pgsql_config, Config),
  376. QueryMode = ?config(query_mode, Config),
  377. PgsqlConfig = PgsqlConfig0#{
  378. <<"name">> => Name,
  379. <<"type">> => BridgeType,
  380. %% NOTE: using literal passwords with HTTP API requests.
  381. <<"password">> => <<?PGSQL_PASSWORD>>
  382. },
  383. ?assertMatch(
  384. {ok, _},
  385. create_bridge_http(PgsqlConfig)
  386. ),
  387. Val = integer_to_binary(erlang:unique_integer()),
  388. SentData = #{payload => Val, timestamp => 1668602148000},
  389. ?check_trace(
  390. begin
  391. {Res, {ok, _}} =
  392. ?wait_async_action(
  393. send_message(Config, SentData),
  394. #{?snk_kind := pgsql_connector_query_return},
  395. 10_000
  396. ),
  397. case QueryMode of
  398. async ->
  399. ok;
  400. sync ->
  401. ?assertEqual({ok, 1}, Res)
  402. end,
  403. ?assertMatch(
  404. Val,
  405. connect_and_get_payload(Config)
  406. ),
  407. ok
  408. end,
  409. fun(Trace0) ->
  410. Trace = ?of_kind(pgsql_connector_query_return, Trace0),
  411. case ?config(enable_batch, Config) of
  412. true ->
  413. ?assertMatch([#{result := {_, [{ok, 1}]}}], Trace);
  414. false ->
  415. ?assertMatch([#{result := {ok, 1}}], Trace)
  416. end,
  417. ok
  418. end
  419. ),
  420. ok.
  421. t_get_status(Config) ->
  422. ?assertMatch(
  423. {ok, _},
  424. create_bridge(Config)
  425. ),
  426. ProxyPort = ?config(proxy_port, Config),
  427. ProxyHost = ?config(proxy_host, Config),
  428. ProxyName = ?config(proxy_name, Config),
  429. Name = ?config(pgsql_name, Config),
  430. BridgeType = ?config(pgsql_bridge_type, Config),
  431. ?assertMatch(#{status := connected}, emqx_bridge_v2:health_check(BridgeType, Name)),
  432. emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
  433. ?assertMatch(
  434. #{status := Status} when Status =:= disconnected orelse Status =:= connecting,
  435. emqx_bridge_v2:health_check(BridgeType, Name)
  436. )
  437. end),
  438. ok.
  439. t_create_disconnected(Config) ->
  440. ProxyPort = ?config(proxy_port, Config),
  441. ProxyHost = ?config(proxy_host, Config),
  442. ProxyName = ?config(proxy_name, Config),
  443. ?check_trace(
  444. emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
  445. ?assertMatch({ok, _}, create_bridge(Config))
  446. end),
  447. fun(Trace) ->
  448. ?assertMatch(
  449. [#{error := {start_pool_failed, _, _}}],
  450. ?of_kind(pgsql_connector_start_failed, Trace)
  451. ),
  452. ok
  453. end
  454. ),
  455. ok.
  456. t_write_failure(Config) ->
  457. ProxyName = ?config(proxy_name, Config),
  458. ProxyPort = ?config(proxy_port, Config),
  459. ProxyHost = ?config(proxy_host, Config),
  460. QueryMode = ?config(query_mode, Config),
  461. {ok, _} = create_bridge(Config),
  462. Val = integer_to_binary(erlang:unique_integer()),
  463. SentData = #{payload => Val, timestamp => 1668602148000},
  464. ?check_trace(
  465. emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
  466. {_, {ok, _}} =
  467. ?wait_async_action(
  468. case QueryMode of
  469. sync ->
  470. ?assertMatch({error, _}, send_message(Config, SentData));
  471. async ->
  472. send_message(Config, SentData)
  473. end,
  474. #{?snk_kind := buffer_worker_flush_nack},
  475. 15_000
  476. )
  477. end),
  478. fun(Trace0) ->
  479. ct:pal("trace: ~p", [Trace0]),
  480. Trace = ?of_kind(buffer_worker_flush_nack, Trace0),
  481. ?assertMatch([#{result := {error, _}} | _], Trace),
  482. [#{result := {error, Error}} | _] = Trace,
  483. case Error of
  484. {resource_error, _} ->
  485. ok;
  486. {recoverable_error, disconnected} ->
  487. ok;
  488. _ ->
  489. ct:fail("unexpected error: ~p", [Error])
  490. end
  491. end
  492. ),
  493. ok.
  494. % This test doesn't work with batch enabled since it is not possible
  495. % to set the timeout directly for batch queries
  496. t_write_timeout(Config) ->
  497. ProxyName = ?config(proxy_name, Config),
  498. ProxyPort = ?config(proxy_port, Config),
  499. ProxyHost = ?config(proxy_host, Config),
  500. QueryMode = ?config(query_mode, Config),
  501. {ok, _} = create_bridge(
  502. Config,
  503. #{
  504. <<"resource_opts">> => #{
  505. <<"resume_interval">> => <<"100ms">>,
  506. <<"health_check_interval">> => <<"100ms">>
  507. }
  508. }
  509. ),
  510. Val = integer_to_binary(erlang:unique_integer()),
  511. SentData = #{payload => Val, timestamp => 1668602148000},
  512. {ok, SRef} = snabbkaffe:subscribe(
  513. ?match_event(#{?snk_kind := call_query_enter}),
  514. 2_000
  515. ),
  516. Res0 =
  517. emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() ->
  518. Res1 =
  519. case QueryMode of
  520. async ->
  521. query_resource_async(Config, {send_message, SentData}, #{timeout => 60_000});
  522. sync ->
  523. query_resource(Config, {send_message, SentData})
  524. end,
  525. ?assertMatch({ok, [_]}, snabbkaffe:receive_events(SRef)),
  526. Res1
  527. end),
  528. case Res0 of
  529. {_, Ref} when is_reference(Ref) ->
  530. case receive_result(Ref, 15_000) of
  531. {ok, Res} ->
  532. %% we may receive a successful result depending on
  533. %% timing, if the request is retried after the
  534. %% failure is healed.
  535. case Res of
  536. {error, {unrecoverable_error, _}} ->
  537. ok;
  538. {ok, _} ->
  539. ok;
  540. _ ->
  541. ct:fail("unexpected result: ~p", [Res])
  542. end;
  543. timeout ->
  544. ct:pal("mailbox:\n ~p", [process_info(self(), messages)]),
  545. ct:fail("no response received")
  546. end;
  547. _ ->
  548. ?assertMatch({error, {resource_error, #{reason := timeout}}}, Res0)
  549. end,
  550. ok.
  551. t_simple_sql_query(Config) ->
  552. EnableBatch = ?config(enable_batch, Config),
  553. QueryMode = ?config(query_mode, Config),
  554. ?assertMatch(
  555. {ok, _},
  556. create_bridge(Config)
  557. ),
  558. Request = {sql, <<"SELECT count(1) AS T">>},
  559. Result =
  560. case QueryMode of
  561. sync ->
  562. query_resource(Config, Request);
  563. async ->
  564. {_, Ref} = query_resource_async(Config, Request),
  565. {ok, Res} = receive_result(Ref, 2_000),
  566. Res
  567. end,
  568. case EnableBatch of
  569. true ->
  570. ?assertEqual({error, {unrecoverable_error, batch_prepare_not_implemented}}, Result);
  571. false ->
  572. ?assertMatch({ok, _, [{1}]}, Result)
  573. end,
  574. ok.
  575. t_missing_data(Config) ->
  576. ?assertMatch(
  577. {ok, _},
  578. create_bridge(Config)
  579. ),
  580. {_, {ok, Event}} =
  581. ?wait_async_action(
  582. send_message(Config, #{}),
  583. #{?snk_kind := buffer_worker_flush_ack},
  584. 2_000
  585. ),
  586. ?assertMatch(
  587. #{
  588. result :=
  589. {error,
  590. {unrecoverable_error, #{
  591. error_code := <<"23502">>,
  592. error_codename := not_null_violation,
  593. severity := error
  594. }}}
  595. },
  596. Event
  597. ),
  598. ok.
  599. t_bad_sql_parameter(Config) ->
  600. QueryMode = ?config(query_mode, Config),
  601. EnableBatch = ?config(enable_batch, Config),
  602. ?assertMatch(
  603. {ok, _},
  604. create_bridge(Config)
  605. ),
  606. Request = {sql, <<"">>, [bad_parameter]},
  607. Result =
  608. case QueryMode of
  609. sync ->
  610. query_resource(Config, Request);
  611. async ->
  612. {_, Ref} = query_resource_async(Config, Request),
  613. {ok, Res} = receive_result(Ref, 2_000),
  614. Res
  615. end,
  616. case EnableBatch of
  617. true ->
  618. ?assertEqual({error, {unrecoverable_error, invalid_request}}, Result);
  619. false ->
  620. ?assertMatch(
  621. {error, {unrecoverable_error, _}}, Result
  622. )
  623. end,
  624. ok.
  625. t_nasty_sql_string(Config) ->
  626. ?assertMatch({ok, _}, create_bridge(Config)),
  627. Payload = list_to_binary(lists:seq(1, 127)),
  628. Message = #{payload => Payload, timestamp => erlang:system_time(millisecond)},
  629. {_, {ok, _}} =
  630. ?wait_async_action(
  631. send_message(Config, Message),
  632. #{?snk_kind := pgsql_connector_query_return},
  633. 1_000
  634. ),
  635. ?assertEqual(Payload, connect_and_get_payload(Config)).
  636. t_missing_table(Config) ->
  637. Name = ?config(pgsql_name, Config),
  638. BridgeType = ?config(pgsql_bridge_type, Config),
  639. % ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
  640. ?check_trace(
  641. begin
  642. connect_and_drop_table(Config),
  643. ?assertMatch({ok, _}, create_bridge(Config)),
  644. ?retry(
  645. _Sleep = 1_000,
  646. _Attempts = 20,
  647. ?assertMatch(
  648. #{status := Status} when Status == connecting orelse Status == disconnected,
  649. emqx_bridge_v2:health_check(BridgeType, Name)
  650. )
  651. ),
  652. Val = integer_to_binary(erlang:unique_integer()),
  653. SentData = #{payload => Val, timestamp => 1668602148000},
  654. ?assertMatch(
  655. {error, {resource_error, #{reason := unhealthy_target}}},
  656. query_resource(Config, {send_message, SentData})
  657. ),
  658. ok
  659. end,
  660. fun(Trace) ->
  661. ?assertMatch([_ | _], ?of_kind(pgsql_undefined_table, Trace)),
  662. ok
  663. end
  664. ),
  665. connect_and_create_table(Config),
  666. ok.
  667. t_table_removed(Config) ->
  668. Name = ?config(pgsql_name, Config),
  669. BridgeType = ?config(pgsql_bridge_type, Config),
  670. ?check_trace(
  671. begin
  672. connect_and_create_table(Config),
  673. ?assertMatch({ok, _}, create_bridge(Config)),
  674. ?retry(
  675. _Sleep = 100,
  676. _Attempts = 200,
  677. ?assertMatch(#{status := connected}, emqx_bridge_v2:health_check(BridgeType, Name))
  678. ),
  679. connect_and_drop_table(Config),
  680. Val = integer_to_binary(erlang:unique_integer()),
  681. SentData = #{payload => Val, timestamp => 1668602148000},
  682. ActionId = emqx_bridge_v2:id(BridgeType, Name),
  683. case query_resource_sync(Config, {ActionId, SentData}) of
  684. {error, {unrecoverable_error, _}} ->
  685. ok;
  686. ?RESOURCE_ERROR_M(not_connected, _) ->
  687. ok;
  688. Res ->
  689. ct:fail("unexpected result: ~p", [Res])
  690. end,
  691. ok
  692. end,
  693. []
  694. ),
  695. connect_and_create_table(Config),
  696. ok.
  697. t_concurrent_health_checks(Config) ->
  698. Name = ?config(pgsql_name, Config),
  699. BridgeType = ?config(pgsql_bridge_type, Config),
  700. ?check_trace(
  701. begin
  702. connect_and_create_table(Config),
  703. ?assertMatch({ok, _}, create_bridge(Config)),
  704. ?retry(
  705. _Sleep = 1_000,
  706. _Attempts = 20,
  707. ?assertMatch(#{status := connected}, emqx_bridge_v2:health_check(BridgeType, Name))
  708. ),
  709. emqx_utils:pmap(
  710. fun(_) ->
  711. ?assertMatch(
  712. #{status := connected}, emqx_bridge_v2:health_check(BridgeType, Name)
  713. )
  714. end,
  715. lists:seq(1, 20)
  716. ),
  717. ok
  718. end,
  719. fun(Trace) ->
  720. ?assertEqual([], ?of_kind(postgres_connector_bad_parse2, Trace)),
  721. ok
  722. end
  723. ),
  724. ok.