emqx_bridge_pgsql_SUITE.erl 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%--------------------------------------------------------------------
  4. -module(emqx_bridge_pgsql_SUITE).
  5. -compile(nowarn_export_all).
  6. -compile(export_all).
  7. -include_lib("eunit/include/eunit.hrl").
  8. -include_lib("common_test/include/ct.hrl").
  9. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  10. -include("../../emqx_resource/include/emqx_resource_errors.hrl").
  11. % SQL definitions
  12. -define(SQL_BRIDGE,
  13. "INSERT INTO mqtt_test(payload, arrived) "
  14. "VALUES (${payload}, TO_TIMESTAMP((${timestamp} :: bigint)/1000))"
  15. ).
  16. -define(SQL_CREATE_TABLE,
  17. "CREATE TABLE IF NOT EXISTS mqtt_test (payload text, arrived timestamp NOT NULL) "
  18. ).
  19. -define(SQL_DROP_TABLE, "DROP TABLE mqtt_test").
  20. -define(SQL_DELETE, "DELETE from mqtt_test").
  21. -define(SQL_SELECT, "SELECT payload FROM mqtt_test").
  22. % DB defaults
  23. -define(PGSQL_DATABASE, "mqtt").
  24. -define(PGSQL_USERNAME, "root").
  25. -define(PGSQL_PASSWORD, "public").
  26. -define(BATCH_SIZE, 10).
  27. %%------------------------------------------------------------------------------
  28. %% CT boilerplate
  29. %%------------------------------------------------------------------------------
  30. all() ->
  31. [
  32. {group, tcp},
  33. {group, tls}
  34. ].
  35. groups() ->
  36. TCs = emqx_common_test_helpers:all(?MODULE),
  37. NonBatchCases = [t_write_timeout],
  38. BatchVariantGroups = [
  39. {group, with_batch},
  40. {group, without_batch},
  41. {group, matrix},
  42. {group, timescale}
  43. ],
  44. QueryModeGroups = [{async, BatchVariantGroups}, {sync, BatchVariantGroups}],
  45. [
  46. {tcp, QueryModeGroups},
  47. {tls, QueryModeGroups},
  48. {async, BatchVariantGroups},
  49. {sync, BatchVariantGroups},
  50. {with_batch, TCs -- NonBatchCases},
  51. {without_batch, TCs},
  52. {matrix, [t_setup_via_config_and_publish, t_setup_via_http_api_and_publish]},
  53. {timescale, [t_setup_via_config_and_publish, t_setup_via_http_api_and_publish]}
  54. ].
  55. init_per_group(tcp, Config) ->
  56. Host = os:getenv("PGSQL_TCP_HOST", "toxiproxy"),
  57. Port = list_to_integer(os:getenv("PGSQL_TCP_PORT", "5432")),
  58. [
  59. {pgsql_host, Host},
  60. {pgsql_port, Port},
  61. {enable_tls, false},
  62. {proxy_name, "pgsql_tcp"}
  63. | Config
  64. ];
  65. init_per_group(tls, Config) ->
  66. Host = os:getenv("PGSQL_TLS_HOST", "toxiproxy"),
  67. Port = list_to_integer(os:getenv("PGSQL_TLS_PORT", "5433")),
  68. [
  69. {pgsql_host, Host},
  70. {pgsql_port, Port},
  71. {enable_tls, true},
  72. {proxy_name, "pgsql_tls"}
  73. | Config
  74. ];
  75. init_per_group(async, Config) ->
  76. [{query_mode, async} | Config];
  77. init_per_group(sync, Config) ->
  78. [{query_mode, sync} | Config];
  79. init_per_group(with_batch, Config0) ->
  80. Config = [{enable_batch, true} | Config0],
  81. common_init(Config);
  82. init_per_group(without_batch, Config0) ->
  83. Config = [{enable_batch, false} | Config0],
  84. common_init(Config);
  85. init_per_group(matrix, Config0) ->
  86. Config = [{bridge_type, <<"matrix">>}, {enable_batch, true} | Config0],
  87. common_init(Config);
  88. init_per_group(timescale, Config0) ->
  89. Config = [{bridge_type, <<"timescale">>}, {enable_batch, true} | Config0],
  90. common_init(Config);
  91. init_per_group(_Group, Config) ->
  92. Config.
  93. end_per_group(Group, Config) when
  94. Group =:= with_batch;
  95. Group =:= without_batch;
  96. Group =:= matrix;
  97. Group =:= timescale
  98. ->
  99. Apps = ?config(apps, Config),
  100. ProxyHost = ?config(proxy_host, Config),
  101. ProxyPort = ?config(proxy_port, Config),
  102. connect_and_drop_table(Config),
  103. emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
  104. ok = emqx_cth_suite:stop(Apps),
  105. ok;
  106. end_per_group(_Group, _Config) ->
  107. ok.
  108. init_per_suite(Config) ->
  109. Config.
  110. end_per_suite(_Config) ->
  111. ok.
  112. init_per_testcase(_Testcase, Config) ->
  113. connect_and_clear_table(Config),
  114. delete_bridge(Config),
  115. snabbkaffe:start_trace(),
  116. Config.
  117. end_per_testcase(_Testcase, Config) ->
  118. ProxyHost = ?config(proxy_host, Config),
  119. ProxyPort = ?config(proxy_port, Config),
  120. emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
  121. connect_and_clear_table(Config),
  122. ok = snabbkaffe:stop(),
  123. delete_bridge(Config),
  124. emqx_common_test_helpers:call_janitor(),
  125. ok.
  126. %%------------------------------------------------------------------------------
  127. %% Helper fns
  128. %%------------------------------------------------------------------------------
  129. common_init(Config0) ->
  130. BridgeType = proplists:get_value(bridge_type, Config0, <<"pgsql">>),
  131. Host = ?config(pgsql_host, Config0),
  132. Port = ?config(pgsql_port, Config0),
  133. case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of
  134. true ->
  135. % Setup toxiproxy
  136. ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
  137. ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
  138. emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
  139. % Ensure enterprise bridge module is loaded
  140. Apps = emqx_cth_suite:start(
  141. [
  142. emqx,
  143. emqx_conf,
  144. emqx_connector,
  145. emqx_bridge,
  146. emqx_bridge_pgsql,
  147. emqx_rule_engine,
  148. emqx_management,
  149. {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
  150. ],
  151. #{work_dir => emqx_cth_suite:work_dir(Config0)}
  152. ),
  153. {ok, _Api} = emqx_common_test_http:create_default_app(),
  154. % Connect to pgsql directly and create the table
  155. connect_and_create_table(Config0),
  156. {Name, PGConf} = pgsql_config(BridgeType, Config0),
  157. Config =
  158. [
  159. {apps, Apps},
  160. {pgsql_config, PGConf},
  161. {pgsql_bridge_type, BridgeType},
  162. {pgsql_name, Name},
  163. {proxy_host, ProxyHost},
  164. {proxy_port, ProxyPort}
  165. | Config0
  166. ],
  167. Config;
  168. false ->
  169. case os:getenv("IS_CI") of
  170. "yes" ->
  171. throw(no_pgsql);
  172. _ ->
  173. {skip, no_pgsql}
  174. end
  175. end.
  176. pgsql_config(BridgeType, Config) ->
  177. Port = integer_to_list(?config(pgsql_port, Config)),
  178. Server = ?config(pgsql_host, Config) ++ ":" ++ Port,
  179. Name = atom_to_binary(?MODULE),
  180. BatchSize =
  181. case ?config(enable_batch, Config) of
  182. true -> ?BATCH_SIZE;
  183. false -> 1
  184. end,
  185. QueryMode = ?config(query_mode, Config),
  186. TlsEnabled = ?config(enable_tls, Config),
  187. %% NOTE: supplying password through a file here, to verify that it works.
  188. Password = create_passfile(BridgeType, Config),
  189. ConfigString =
  190. io_lib:format(
  191. "bridges.~s.~s {"
  192. "\n enable = true"
  193. "\n server = ~p"
  194. "\n database = ~p"
  195. "\n username = ~p"
  196. "\n password = ~p"
  197. "\n sql = ~p"
  198. "\n resource_opts = {"
  199. "\n request_ttl = 500ms"
  200. "\n batch_size = ~b"
  201. "\n query_mode = ~s"
  202. "\n worker_pool_size = 1"
  203. "\n health_check_interval = 15s"
  204. "\n start_after_created = true"
  205. "\n start_timeout = 5s"
  206. "\n inflight_window = 100"
  207. "\n max_buffer_bytes = 256MB"
  208. "\n buffer_seg_bytes = 10MB"
  209. "\n buffer_mode = memory_only"
  210. "\n metrics_flush_interval = 5s"
  211. "\n resume_interval = 15s"
  212. "\n }"
  213. "\n ssl = {"
  214. "\n enable = ~w"
  215. "\n }"
  216. "\n }",
  217. [
  218. BridgeType,
  219. Name,
  220. Server,
  221. ?PGSQL_DATABASE,
  222. ?PGSQL_USERNAME,
  223. Password,
  224. ?SQL_BRIDGE,
  225. BatchSize,
  226. QueryMode,
  227. TlsEnabled
  228. ]
  229. ),
  230. {Name, parse_and_check(ConfigString, BridgeType, Name)}.
  231. default_sql() ->
  232. ?SQL_BRIDGE.
  233. create_passfile(BridgeType, Config) ->
  234. Filename = binary_to_list(BridgeType) ++ ".passfile",
  235. Filepath = filename:join(?config(priv_dir, Config), Filename),
  236. ok = file:write_file(Filepath, ?PGSQL_PASSWORD),
  237. "file://" ++ Filepath.
  238. parse_and_check(ConfigString, BridgeType, Name) ->
  239. {ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
  240. hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
  241. #{<<"bridges">> := #{BridgeType := #{Name := Config}}} = RawConf,
  242. Config.
  243. create_bridge(Config) ->
  244. create_bridge(Config, _Overrides = #{}).
  245. create_bridge(Config, Overrides) ->
  246. BridgeType = ?config(pgsql_bridge_type, Config),
  247. Name = ?config(pgsql_name, Config),
  248. PGConfig0 = ?config(pgsql_config, Config),
  249. PGConfig = emqx_utils_maps:deep_merge(PGConfig0, Overrides),
  250. emqx_bridge:create(BridgeType, Name, PGConfig).
  251. delete_bridge(Config) ->
  252. BridgeType = ?config(pgsql_bridge_type, Config),
  253. Name = ?config(pgsql_name, Config),
  254. emqx_bridge:remove(BridgeType, Name).
  255. create_bridge_http(Params) ->
  256. Path = emqx_mgmt_api_test_util:api_path(["bridges"]),
  257. AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
  258. case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of
  259. {ok, Res} -> {ok, emqx_utils_json:decode(Res, [return_maps])};
  260. Error -> Error
  261. end.
  262. send_message(Config, Payload) ->
  263. Name = ?config(pgsql_name, Config),
  264. BridgeType = ?config(pgsql_bridge_type, Config),
  265. BridgeID = emqx_bridge_resource:bridge_id(BridgeType, Name),
  266. emqx_bridge:send_message(BridgeID, Payload).
  267. query_resource(Config, Msg = _Request) ->
  268. Name = ?config(pgsql_name, Config),
  269. BridgeType = ?config(pgsql_bridge_type, Config),
  270. emqx_bridge_v2:query(BridgeType, Name, Msg, #{timeout => 1_000}).
  271. query_resource_sync(Config, Request) ->
  272. Name = ?config(pgsql_name, Config),
  273. BridgeType = ?config(pgsql_bridge_type, Config),
  274. ActionId = emqx_bridge_v2:id(BridgeType, Name),
  275. emqx_resource_buffer_worker:simple_sync_query(ActionId, Request).
  276. query_resource_async(Config, Request) ->
  277. query_resource_async(Config, Request, _Opts = #{}).
  278. query_resource_async(Config, Request, Opts) ->
  279. Name = ?config(pgsql_name, Config),
  280. BridgeType = ?config(pgsql_bridge_type, Config),
  281. Ref = alias([reply]),
  282. AsyncReplyFun = fun(Result) -> Ref ! {result, Ref, Result} end,
  283. Timeout = maps:get(timeout, Opts, 500),
  284. Return = emqx_bridge_v2:query(BridgeType, Name, Request, #{
  285. timeout => Timeout,
  286. async_reply_fun => {AsyncReplyFun, []}
  287. }),
  288. {Return, Ref}.
  289. receive_result(Ref, Timeout) ->
  290. receive
  291. {result, Ref, Result} ->
  292. {ok, Result};
  293. {Ref, Result} ->
  294. {ok, Result}
  295. after Timeout ->
  296. timeout
  297. end.
  298. connect_direct_pgsql(Config) ->
  299. Opts = #{
  300. host => ?config(pgsql_host, Config),
  301. port => ?config(pgsql_port, Config),
  302. username => ?PGSQL_USERNAME,
  303. password => ?PGSQL_PASSWORD,
  304. database => ?PGSQL_DATABASE
  305. },
  306. SslOpts =
  307. case ?config(enable_tls, Config) of
  308. true ->
  309. Opts#{
  310. ssl => true,
  311. ssl_opts => emqx_tls_lib:to_client_opts(#{enable => true})
  312. };
  313. false ->
  314. Opts
  315. end,
  316. {ok, Con} = epgsql:connect(SslOpts),
  317. Con.
  318. % These funs connect and then stop the pgsql connection
  319. connect_and_create_table(Config) ->
  320. Con = connect_direct_pgsql(Config),
  321. {ok, _, _} = epgsql:squery(Con, ?SQL_CREATE_TABLE),
  322. ok = epgsql:close(Con).
  323. connect_and_drop_table(Config) ->
  324. Con = connect_direct_pgsql(Config),
  325. {ok, _, _} = epgsql:squery(Con, ?SQL_DROP_TABLE),
  326. ok = epgsql:close(Con).
  327. connect_and_clear_table(Config) ->
  328. Con = connect_direct_pgsql(Config),
  329. _ = epgsql:squery(Con, ?SQL_CREATE_TABLE),
  330. {ok, _} = epgsql:squery(Con, ?SQL_DELETE),
  331. ok = epgsql:close(Con).
  332. connect_and_get_payload(Config) ->
  333. Con = connect_direct_pgsql(Config),
  334. {ok, _, [{Result}]} = epgsql:squery(Con, ?SQL_SELECT),
  335. ok = epgsql:close(Con),
  336. Result.
  337. %%------------------------------------------------------------------------------
  338. %% Testcases
  339. %%------------------------------------------------------------------------------
  340. t_setup_via_config_and_publish(Config) ->
  341. ?assertMatch(
  342. {ok, _},
  343. create_bridge(Config)
  344. ),
  345. Val = integer_to_binary(erlang:unique_integer()),
  346. SentData = #{payload => Val, timestamp => 1668602148000},
  347. ?check_trace(
  348. begin
  349. {_, {ok, _}} =
  350. ?wait_async_action(
  351. send_message(Config, SentData),
  352. #{?snk_kind := pgsql_connector_query_return},
  353. 10_000
  354. ),
  355. ?assertMatch(
  356. Val,
  357. connect_and_get_payload(Config)
  358. ),
  359. ok
  360. end,
  361. fun(Trace0) ->
  362. Trace = ?of_kind(pgsql_connector_query_return, Trace0),
  363. case ?config(enable_batch, Config) of
  364. true ->
  365. ?assertMatch([#{result := {_, [{ok, 1}]}}], Trace);
  366. false ->
  367. ?assertMatch([#{result := {ok, 1}}], Trace)
  368. end,
  369. ok
  370. end
  371. ),
  372. ok.
  373. t_setup_via_http_api_and_publish(Config) ->
  374. BridgeType = ?config(pgsql_bridge_type, Config),
  375. Name = ?config(pgsql_name, Config),
  376. PgsqlConfig0 = ?config(pgsql_config, Config),
  377. QueryMode = ?config(query_mode, Config),
  378. PgsqlConfig = PgsqlConfig0#{
  379. <<"name">> => Name,
  380. <<"type">> => BridgeType,
  381. %% NOTE: using literal passwords with HTTP API requests.
  382. <<"password">> => <<?PGSQL_PASSWORD>>
  383. },
  384. ?assertMatch(
  385. {ok, _},
  386. create_bridge_http(PgsqlConfig)
  387. ),
  388. Val = integer_to_binary(erlang:unique_integer()),
  389. SentData = #{payload => Val, timestamp => 1668602148000},
  390. ?check_trace(
  391. begin
  392. {Res, {ok, _}} =
  393. ?wait_async_action(
  394. send_message(Config, SentData),
  395. #{?snk_kind := pgsql_connector_query_return},
  396. 10_000
  397. ),
  398. case QueryMode of
  399. async ->
  400. ok;
  401. sync ->
  402. ?assertEqual({ok, 1}, Res)
  403. end,
  404. ?assertMatch(
  405. Val,
  406. connect_and_get_payload(Config)
  407. ),
  408. ok
  409. end,
  410. fun(Trace0) ->
  411. Trace = ?of_kind(pgsql_connector_query_return, Trace0),
  412. case ?config(enable_batch, Config) of
  413. true ->
  414. ?assertMatch([#{result := {_, [{ok, 1}]}}], Trace);
  415. false ->
  416. ?assertMatch([#{result := {ok, 1}}], Trace)
  417. end,
  418. ok
  419. end
  420. ),
  421. ok.
  422. t_get_status(Config) ->
  423. ?assertMatch(
  424. {ok, _},
  425. create_bridge(Config)
  426. ),
  427. ProxyPort = ?config(proxy_port, Config),
  428. ProxyHost = ?config(proxy_host, Config),
  429. ProxyName = ?config(proxy_name, Config),
  430. Name = ?config(pgsql_name, Config),
  431. BridgeType = ?config(pgsql_bridge_type, Config),
  432. ?assertMatch(#{status := connected}, emqx_bridge_v2:health_check(BridgeType, Name)),
  433. emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
  434. ?assertMatch(
  435. #{status := Status} when Status =:= disconnected orelse Status =:= connecting,
  436. emqx_bridge_v2:health_check(BridgeType, Name)
  437. )
  438. end),
  439. ok.
  440. t_create_disconnected(Config) ->
  441. ProxyPort = ?config(proxy_port, Config),
  442. ProxyHost = ?config(proxy_host, Config),
  443. ProxyName = ?config(proxy_name, Config),
  444. ?check_trace(
  445. emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
  446. ?assertMatch({ok, _}, create_bridge(Config))
  447. end),
  448. fun(Trace) ->
  449. ?assertMatch(
  450. [#{error := {start_pool_failed, _, _}}],
  451. ?of_kind(pgsql_connector_start_failed, Trace)
  452. ),
  453. ok
  454. end
  455. ),
  456. ok.
  457. t_write_failure(Config) ->
  458. ProxyName = ?config(proxy_name, Config),
  459. ProxyPort = ?config(proxy_port, Config),
  460. ProxyHost = ?config(proxy_host, Config),
  461. QueryMode = ?config(query_mode, Config),
  462. {ok, _} = create_bridge(Config),
  463. Val = integer_to_binary(erlang:unique_integer()),
  464. SentData = #{payload => Val, timestamp => 1668602148000},
  465. ?check_trace(
  466. emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
  467. {_, {ok, _}} =
  468. ?wait_async_action(
  469. case QueryMode of
  470. sync ->
  471. ?assertMatch({error, _}, send_message(Config, SentData));
  472. async ->
  473. send_message(Config, SentData)
  474. end,
  475. #{?snk_kind := buffer_worker_flush_nack},
  476. 15_000
  477. )
  478. end),
  479. fun(Trace0) ->
  480. Trace = ?of_kind(buffer_worker_flush_nack, Trace0),
  481. ?assertMatch([#{result := {error, _}} | _], Trace),
  482. [#{result := {error, Error}} | _] = Trace,
  483. case Error of
  484. {resource_error, _} ->
  485. ok;
  486. {recoverable_error, disconnected} ->
  487. ok;
  488. _ ->
  489. ct:fail("unexpected error: ~p", [Error])
  490. end
  491. end
  492. ),
  493. ok.
  494. % This test doesn't work with batch enabled since it is not possible
  495. % to set the timeout directly for batch queries
  496. t_write_timeout(Config) ->
  497. ProxyName = ?config(proxy_name, Config),
  498. ProxyPort = ?config(proxy_port, Config),
  499. ProxyHost = ?config(proxy_host, Config),
  500. QueryMode = ?config(query_mode, Config),
  501. {ok, _} = create_bridge(
  502. Config,
  503. #{
  504. <<"resource_opts">> => #{
  505. <<"resume_interval">> => <<"100ms">>,
  506. <<"health_check_interval">> => <<"100ms">>
  507. }
  508. }
  509. ),
  510. Val = integer_to_binary(erlang:unique_integer()),
  511. SentData = #{payload => Val, timestamp => 1668602148000},
  512. {ok, SRef} = snabbkaffe:subscribe(
  513. ?match_event(#{?snk_kind := call_query_enter}),
  514. 2_000
  515. ),
  516. Res0 =
  517. emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() ->
  518. Res1 =
  519. case QueryMode of
  520. async ->
  521. query_resource_async(Config, {send_message, SentData}, #{timeout => 60_000});
  522. sync ->
  523. query_resource(Config, {send_message, SentData})
  524. end,
  525. ?assertMatch({ok, [_]}, snabbkaffe:receive_events(SRef)),
  526. Res1
  527. end),
  528. case Res0 of
  529. {_, Ref} when is_reference(Ref) ->
  530. case receive_result(Ref, 15_000) of
  531. {ok, Res} ->
  532. %% we may receive a successful result depending on
  533. %% timing, if the request is retried after the
  534. %% failure is healed.
  535. case Res of
  536. {error, {unrecoverable_error, _}} ->
  537. ok;
  538. {ok, _} ->
  539. ok;
  540. _ ->
  541. ct:fail("unexpected result: ~p", [Res])
  542. end;
  543. timeout ->
  544. ct:pal("mailbox:\n ~p", [process_info(self(), messages)]),
  545. ct:fail("no response received")
  546. end;
  547. _ ->
  548. ?assertMatch({error, {resource_error, #{reason := timeout}}}, Res0)
  549. end,
  550. ok.
  551. t_simple_sql_query(Config) ->
  552. EnableBatch = ?config(enable_batch, Config),
  553. QueryMode = ?config(query_mode, Config),
  554. ?assertMatch(
  555. {ok, _},
  556. create_bridge(Config)
  557. ),
  558. Request = {query, <<"SELECT count(1) AS T">>},
  559. Result =
  560. case QueryMode of
  561. sync ->
  562. query_resource(Config, Request);
  563. async ->
  564. {_, Ref} = query_resource_async(Config, Request),
  565. {ok, Res} = receive_result(Ref, 2_000),
  566. Res
  567. end,
  568. case EnableBatch of
  569. true ->
  570. ?assertEqual({error, {unrecoverable_error, batch_prepare_not_implemented}}, Result);
  571. false ->
  572. ?assertMatch({ok, _, [{1}]}, Result)
  573. end,
  574. ok.
  575. t_missing_data(Config) ->
  576. ?assertMatch(
  577. {ok, _},
  578. create_bridge(Config)
  579. ),
  580. {_, {ok, Event}} =
  581. ?wait_async_action(
  582. send_message(Config, #{}),
  583. #{?snk_kind := buffer_worker_flush_ack},
  584. 2_000
  585. ),
  586. ?assertMatch(
  587. #{
  588. result :=
  589. {error,
  590. {unrecoverable_error, #{
  591. error_code := <<"23502">>,
  592. error_codename := not_null_violation,
  593. severity := error
  594. }}}
  595. },
  596. Event
  597. ),
  598. ok.
  599. t_bad_sql_parameter(Config) ->
  600. QueryMode = ?config(query_mode, Config),
  601. EnableBatch = ?config(enable_batch, Config),
  602. ?assertMatch(
  603. {ok, _},
  604. create_bridge(Config)
  605. ),
  606. Request = {query, <<"">>, [bad_parameter]},
  607. Result =
  608. case QueryMode of
  609. sync ->
  610. query_resource(Config, Request);
  611. async ->
  612. {_, Ref} = query_resource_async(Config, Request),
  613. {ok, Res} = receive_result(Ref, 2_000),
  614. Res
  615. end,
  616. case EnableBatch of
  617. true ->
  618. ?assertEqual({error, {unrecoverable_error, invalid_request}}, Result);
  619. false ->
  620. ?assertMatch(
  621. {error, {unrecoverable_error, _}}, Result
  622. )
  623. end,
  624. ok.
  625. t_nasty_sql_string(Config) ->
  626. ?assertMatch({ok, _}, create_bridge(Config)),
  627. Payload = list_to_binary(lists:seq(1, 127)),
  628. Message = #{payload => Payload, timestamp => erlang:system_time(millisecond)},
  629. {_, {ok, _}} =
  630. ?wait_async_action(
  631. send_message(Config, Message),
  632. #{?snk_kind := pgsql_connector_query_return},
  633. 1_000
  634. ),
  635. ?assertEqual(Payload, connect_and_get_payload(Config)).
  636. t_missing_table(Config) ->
  637. Name = ?config(pgsql_name, Config),
  638. BridgeType = ?config(pgsql_bridge_type, Config),
  639. % ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
  640. ?check_trace(
  641. begin
  642. connect_and_drop_table(Config),
  643. ?assertMatch({ok, _}, create_bridge(Config)),
  644. ?retry(
  645. _Sleep = 1_000,
  646. _Attempts = 20,
  647. ?assertMatch(
  648. #{status := Status} when Status == connecting orelse Status == disconnected,
  649. emqx_bridge_v2:health_check(BridgeType, Name)
  650. )
  651. ),
  652. Val = integer_to_binary(erlang:unique_integer()),
  653. SentData = #{payload => Val, timestamp => 1668602148000},
  654. ?assertMatch(
  655. {error, {resource_error, #{reason := unhealthy_target}}},
  656. query_resource(Config, {send_message, SentData})
  657. ),
  658. ok
  659. end,
  660. fun(Trace) ->
  661. ?assertMatch([_ | _], ?of_kind(pgsql_undefined_table, Trace)),
  662. ok
  663. end
  664. ),
  665. connect_and_create_table(Config),
  666. ok.
  667. %% We test that we can handle when the prepared statement with the channel
  668. %% name already exists in the connection instance when we try to make a new
  669. %% prepared statement. It is unknown in which scenario this can happen but it
  670. %% has been observed in a production log file.
  671. %% See:
  672. %% https://emqx.atlassian.net/browse/EEC-1036
  673. t_prepared_statement_exists(Config) ->
  674. Name = ?config(pgsql_name, Config),
  675. BridgeType = ?config(pgsql_bridge_type, Config),
  676. emqx_common_test_helpers:on_exit(fun() ->
  677. meck:unload()
  678. end),
  679. MeckOpts = [passthrough, no_link, no_history],
  680. meck:new(epgsql, MeckOpts),
  681. InsertPrepStatementDupAndThenRemoveMeck =
  682. fun(Conn, Key, SQL, List) ->
  683. meck:passthrough([Conn, Key, SQL, List]),
  684. meck:delete(
  685. epgsql,
  686. parse2,
  687. 4
  688. ),
  689. meck:passthrough([Conn, Key, SQL, List])
  690. end,
  691. meck:expect(
  692. epgsql,
  693. parse2,
  694. InsertPrepStatementDupAndThenRemoveMeck
  695. ),
  696. %% We should recover if the prepared statement name already exists in the
  697. %% driver
  698. ?check_trace(
  699. begin
  700. ?assertMatch({ok, _}, create_bridge(Config)),
  701. ?retry(
  702. _Sleep = 1_000,
  703. _Attempts = 20,
  704. ?assertMatch(
  705. #{status := Status} when Status == connected,
  706. emqx_bridge_v2:health_check(BridgeType, Name)
  707. )
  708. ),
  709. ok
  710. end,
  711. fun(Trace) ->
  712. ?assertMatch([_ | _], ?of_kind(pgsql_prepared_statement_exists, Trace)),
  713. ok
  714. end
  715. ),
  716. InsertPrepStatementDup =
  717. fun(Conn, Key, SQL, List) ->
  718. meck:passthrough([Conn, Key, SQL, List]),
  719. meck:passthrough([Conn, Key, SQL, List])
  720. end,
  721. meck:expect(
  722. epgsql,
  723. parse2,
  724. InsertPrepStatementDup
  725. ),
  726. %% We should get status disconnected if removing already existing statment don't help
  727. ?check_trace(
  728. begin
  729. ?assertMatch({ok, _}, create_bridge(Config)),
  730. ?retry(
  731. _Sleep = 1_000,
  732. _Attempts = 20,
  733. ?assertMatch(
  734. #{status := Status} when Status == disconnected,
  735. emqx_bridge_v2:health_check(BridgeType, Name)
  736. )
  737. ),
  738. snabbkaffe_nemesis:cleanup(),
  739. ok
  740. end,
  741. fun(Trace) ->
  742. ?assertMatch([_ | _], ?of_kind(pgsql_prepared_statement_exists, Trace)),
  743. ok
  744. end
  745. ),
  746. meck:unload(),
  747. ok.
  748. t_table_removed(Config) ->
  749. Name = ?config(pgsql_name, Config),
  750. BridgeType = ?config(pgsql_bridge_type, Config),
  751. ?check_trace(
  752. begin
  753. ct:pal("creating table"),
  754. connect_and_create_table(Config),
  755. ct:pal("creating bridge"),
  756. ?assertMatch(
  757. {ok, _},
  758. create_bridge(Config, #{
  759. <<"resource_opts">> => #{
  760. <<"health_check_interval">> => <<"1s">>
  761. }
  762. })
  763. ),
  764. ct:pal("checking bridge health"),
  765. ?retry(
  766. _Sleep = 100,
  767. _Attempts = 200,
  768. ?assertMatch(#{status := connected}, emqx_bridge_v2:health_check(BridgeType, Name))
  769. ),
  770. ct:pal("dropping table"),
  771. connect_and_drop_table(Config),
  772. Val = integer_to_binary(erlang:unique_integer()),
  773. SentData = #{payload => Val, timestamp => 1668602148000},
  774. ActionId = emqx_bridge_v2:id(BridgeType, Name),
  775. ?retry(
  776. _Sleep = 100,
  777. _Attempts = 200,
  778. ?assertMatch(
  779. #{error := {unhealthy_target, _}, status := disconnected},
  780. emqx_bridge_v2:health_check(BridgeType, Name)
  781. )
  782. ),
  783. ct:pal("sending query"),
  784. case query_resource_sync(Config, {ActionId, SentData}) of
  785. {error, {unrecoverable_error, _}} ->
  786. ok;
  787. ?RESOURCE_ERROR_M(not_connected, _) ->
  788. ok;
  789. Res ->
  790. ct:fail("unexpected result: ~p", [Res])
  791. end,
  792. ok
  793. end,
  794. []
  795. ),
  796. connect_and_create_table(Config),
  797. ok.
  798. t_concurrent_health_checks(Config) ->
  799. Name = ?config(pgsql_name, Config),
  800. BridgeType = ?config(pgsql_bridge_type, Config),
  801. ?check_trace(
  802. begin
  803. connect_and_create_table(Config),
  804. ?assertMatch({ok, _}, create_bridge(Config)),
  805. ?retry(
  806. _Sleep = 1_000,
  807. _Attempts = 20,
  808. ?assertMatch(#{status := connected}, emqx_bridge_v2:health_check(BridgeType, Name))
  809. ),
  810. emqx_utils:pmap(
  811. fun(_) ->
  812. ?assertMatch(
  813. #{status := connected}, emqx_bridge_v2:health_check(BridgeType, Name)
  814. )
  815. end,
  816. lists:seq(1, 20)
  817. ),
  818. ok
  819. end,
  820. fun(Trace) ->
  821. ?assertEqual([], ?of_kind(postgres_connector_bad_parse2, Trace)),
  822. ok
  823. end
  824. ),
  825. ok.