emqx_bridge_pgsql_SUITE.erl 20 KB


  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%--------------------------------------------------------------------
  4. -module(emqx_bridge_pgsql_SUITE).
  5. -compile(nowarn_export_all).
  6. -compile(export_all).
  7. -include_lib("eunit/include/eunit.hrl").
  8. -include_lib("common_test/include/ct.hrl").
  9. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  10. % SQL definitions
  11. -define(SQL_BRIDGE,
  12. "INSERT INTO mqtt_test(payload, arrived) "
  13. "VALUES (${payload}, TO_TIMESTAMP((${timestamp} :: bigint)/1000))"
  14. ).
  15. -define(SQL_CREATE_TABLE,
  16. "CREATE TABLE IF NOT EXISTS mqtt_test (payload text, arrived timestamp NOT NULL) "
  17. ).
  18. -define(SQL_DROP_TABLE, "DROP TABLE mqtt_test").
  19. -define(SQL_DELETE, "DELETE from mqtt_test").
  20. -define(SQL_SELECT, "SELECT payload FROM mqtt_test").
  21. % DB defaults
  22. -define(PGSQL_DATABASE, "mqtt").
  23. -define(PGSQL_USERNAME, "root").
  24. -define(PGSQL_PASSWORD, "public").
  25. -define(BATCH_SIZE, 10).
  26. %%------------------------------------------------------------------------------
  27. %% CT boilerplate
  28. %%------------------------------------------------------------------------------
  29. all() ->
  30. [
  31. {group, tcp},
  32. {group, tls}
  33. ].
  34. groups() ->
  35. TCs = emqx_common_test_helpers:all(?MODULE),
  36. NonBatchCases = [t_write_timeout],
  37. BatchVariantGroups = [
  38. {group, with_batch},
  39. {group, without_batch},
  40. {group, matrix},
  41. {group, timescale}
  42. ],
  43. QueryModeGroups = [{async, BatchVariantGroups}, {sync, BatchVariantGroups}],
  44. [
  45. {tcp, QueryModeGroups},
  46. {tls, QueryModeGroups},
  47. {async, BatchVariantGroups},
  48. {sync, BatchVariantGroups},
  49. {with_batch, TCs -- NonBatchCases},
  50. {without_batch, TCs},
  51. {matrix, [t_setup_via_config_and_publish, t_setup_via_http_api_and_publish]},
  52. {timescale, [t_setup_via_config_and_publish, t_setup_via_http_api_and_publish]}
  53. ].
  54. init_per_group(tcp, Config) ->
  55. Host = os:getenv("PGSQL_TCP_HOST", "toxiproxy"),
  56. Port = list_to_integer(os:getenv("PGSQL_TCP_PORT", "5432")),
  57. [
  58. {pgsql_host, Host},
  59. {pgsql_port, Port},
  60. {enable_tls, false},
  61. {proxy_name, "pgsql_tcp"}
  62. | Config
  63. ];
  64. init_per_group(tls, Config) ->
  65. Host = os:getenv("PGSQL_TLS_HOST", "toxiproxy"),
  66. Port = list_to_integer(os:getenv("PGSQL_TLS_PORT", "5433")),
  67. [
  68. {pgsql_host, Host},
  69. {pgsql_port, Port},
  70. {enable_tls, true},
  71. {proxy_name, "pgsql_tls"}
  72. | Config
  73. ];
  74. init_per_group(async, Config) ->
  75. [{query_mode, async} | Config];
  76. init_per_group(sync, Config) ->
  77. [{query_mode, sync} | Config];
  78. init_per_group(with_batch, Config0) ->
  79. Config = [{enable_batch, true} | Config0],
  80. common_init(Config);
  81. init_per_group(without_batch, Config0) ->
  82. Config = [{enable_batch, false} | Config0],
  83. common_init(Config);
  84. init_per_group(matrix, Config0) ->
  85. Config = [{bridge_type, <<"matrix">>}, {enable_batch, true} | Config0],
  86. common_init(Config);
  87. init_per_group(timescale, Config0) ->
  88. Config = [{bridge_type, <<"timescale">>}, {enable_batch, true} | Config0],
  89. common_init(Config);
  90. init_per_group(_Group, Config) ->
  91. Config.
  92. end_per_group(Group, Config) when Group =:= with_batch; Group =:= without_batch ->
  93. connect_and_drop_table(Config),
  94. ProxyHost = ?config(proxy_host, Config),
  95. ProxyPort = ?config(proxy_port, Config),
  96. emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
  97. ok;
  98. end_per_group(_Group, _Config) ->
  99. ok.
  100. init_per_suite(Config) ->
  101. Config.
  102. end_per_suite(_Config) ->
  103. emqx_mgmt_api_test_util:end_suite(),
  104. ok = emqx_common_test_helpers:stop_apps([emqx_bridge, emqx_conf]),
  105. ok.
  106. init_per_testcase(_Testcase, Config) ->
  107. connect_and_clear_table(Config),
  108. delete_bridge(Config),
  109. snabbkaffe:start_trace(),
  110. Config.
  111. end_per_testcase(_Testcase, Config) ->
  112. ProxyHost = ?config(proxy_host, Config),
  113. ProxyPort = ?config(proxy_port, Config),
  114. emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
  115. connect_and_clear_table(Config),
  116. ok = snabbkaffe:stop(),
  117. delete_bridge(Config),
  118. ok.
  119. %%------------------------------------------------------------------------------
  120. %% Helper fns
  121. %%------------------------------------------------------------------------------
  122. common_init(Config0) ->
  123. BridgeType = proplists:get_value(bridge_type, Config0, <<"pgsql">>),
  124. Host = ?config(pgsql_host, Config0),
  125. Port = ?config(pgsql_port, Config0),
  126. case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of
  127. true ->
  128. % Setup toxiproxy
  129. ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
  130. ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
  131. emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
  132. % Ensure EE bridge module is loaded
  133. _ = application:load(emqx_ee_bridge),
  134. _ = emqx_ee_bridge:module_info(),
  135. ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]),
  136. emqx_mgmt_api_test_util:init_suite(),
  137. % Connect to pgsql directly and create the table
  138. connect_and_create_table(Config0),
  139. {Name, PGConf} = pgsql_config(BridgeType, Config0),
  140. Config =
  141. [
  142. {pgsql_config, PGConf},
  143. {pgsql_bridge_type, BridgeType},
  144. {pgsql_name, Name},
  145. {proxy_host, ProxyHost},
  146. {proxy_port, ProxyPort}
  147. | Config0
  148. ],
  149. Config;
  150. false ->
  151. case os:getenv("IS_CI") of
  152. "yes" ->
  153. throw(no_pgsql);
  154. _ ->
  155. {skip, no_pgsql}
  156. end
  157. end.
  158. pgsql_config(BridgeType, Config) ->
  159. Port = integer_to_list(?config(pgsql_port, Config)),
  160. Server = ?config(pgsql_host, Config) ++ ":" ++ Port,
  161. Name = atom_to_binary(?MODULE),
  162. BatchSize =
  163. case ?config(enable_batch, Config) of
  164. true -> ?BATCH_SIZE;
  165. false -> 1
  166. end,
  167. QueryMode = ?config(query_mode, Config),
  168. TlsEnabled = ?config(enable_tls, Config),
  169. ConfigString =
  170. io_lib:format(
  171. "bridges.~s.~s {\n"
  172. " enable = true\n"
  173. " server = ~p\n"
  174. " database = ~p\n"
  175. " username = ~p\n"
  176. " password = ~p\n"
  177. " sql = ~p\n"
  178. " resource_opts = {\n"
  179. " request_timeout = 500ms\n"
  180. " batch_size = ~b\n"
  181. " query_mode = ~s\n"
  182. " }\n"
  183. " ssl = {\n"
  184. " enable = ~w\n"
  185. " }\n"
  186. "}",
  187. [
  188. BridgeType,
  189. Name,
  190. Server,
  191. ?PGSQL_DATABASE,
  192. ?PGSQL_USERNAME,
  193. ?PGSQL_PASSWORD,
  194. ?SQL_BRIDGE,
  195. BatchSize,
  196. QueryMode,
  197. TlsEnabled
  198. ]
  199. ),
  200. {Name, parse_and_check(ConfigString, BridgeType, Name)}.
  201. parse_and_check(ConfigString, BridgeType, Name) ->
  202. {ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
  203. hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
  204. #{<<"bridges">> := #{BridgeType := #{Name := Config}}} = RawConf,
  205. Config.
  206. create_bridge(Config) ->
  207. create_bridge(Config, _Overrides = #{}).
  208. create_bridge(Config, Overrides) ->
  209. BridgeType = ?config(pgsql_bridge_type, Config),
  210. Name = ?config(pgsql_name, Config),
  211. PGConfig0 = ?config(pgsql_config, Config),
  212. PGConfig = emqx_utils_maps:deep_merge(PGConfig0, Overrides),
  213. emqx_bridge:create(BridgeType, Name, PGConfig).
  214. delete_bridge(Config) ->
  215. BridgeType = ?config(pgsql_bridge_type, Config),
  216. Name = ?config(pgsql_name, Config),
  217. emqx_bridge:remove(BridgeType, Name).
  218. create_bridge_http(Params) ->
  219. Path = emqx_mgmt_api_test_util:api_path(["bridges"]),
  220. AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
  221. case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of
  222. {ok, Res} -> {ok, emqx_utils_json:decode(Res, [return_maps])};
  223. Error -> Error
  224. end.
  225. send_message(Config, Payload) ->
  226. Name = ?config(pgsql_name, Config),
  227. BridgeType = ?config(pgsql_bridge_type, Config),
  228. BridgeID = emqx_bridge_resource:bridge_id(BridgeType, Name),
  229. emqx_bridge:send_message(BridgeID, Payload).
  230. query_resource(Config, Request) ->
  231. Name = ?config(pgsql_name, Config),
  232. BridgeType = ?config(pgsql_bridge_type, Config),
  233. ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
  234. emqx_resource:query(ResourceID, Request, #{timeout => 1_000}).
  235. query_resource_async(Config, Request) ->
  236. Name = ?config(pgsql_name, Config),
  237. BridgeType = ?config(pgsql_bridge_type, Config),
  238. Ref = alias([reply]),
  239. AsyncReplyFun = fun(Result) -> Ref ! {result, Ref, Result} end,
  240. ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
  241. Return = emqx_resource:query(ResourceID, Request, #{
  242. timeout => 500, async_reply_fun => {AsyncReplyFun, []}
  243. }),
  244. {Return, Ref}.
  245. receive_result(Ref, Timeout) ->
  246. receive
  247. {result, Ref, Result} ->
  248. {ok, Result};
  249. {Ref, Result} ->
  250. {ok, Result}
  251. after Timeout ->
  252. timeout
  253. end.
  254. connect_direct_pgsql(Config) ->
  255. Opts = #{
  256. host => ?config(pgsql_host, Config),
  257. port => ?config(pgsql_port, Config),
  258. username => ?PGSQL_USERNAME,
  259. password => ?PGSQL_PASSWORD,
  260. database => ?PGSQL_DATABASE
  261. },
  262. SslOpts =
  263. case ?config(enable_tls, Config) of
  264. true ->
  265. Opts#{
  266. ssl => true,
  267. ssl_opts => emqx_tls_lib:to_client_opts(#{enable => true})
  268. };
  269. false ->
  270. Opts
  271. end,
  272. {ok, Con} = epgsql:connect(SslOpts),
  273. Con.
  274. % These funs connect and then stop the pgsql connection
  275. connect_and_create_table(Config) ->
  276. Con = connect_direct_pgsql(Config),
  277. {ok, _, _} = epgsql:squery(Con, ?SQL_CREATE_TABLE),
  278. ok = epgsql:close(Con).
  279. connect_and_drop_table(Config) ->
  280. Con = connect_direct_pgsql(Config),
  281. {ok, _, _} = epgsql:squery(Con, ?SQL_DROP_TABLE),
  282. ok = epgsql:close(Con).
  283. connect_and_clear_table(Config) ->
  284. Con = connect_direct_pgsql(Config),
  285. {ok, _} = epgsql:squery(Con, ?SQL_DELETE),
  286. ok = epgsql:close(Con).
  287. connect_and_get_payload(Config) ->
  288. Con = connect_direct_pgsql(Config),
  289. {ok, _, [{Result}]} = epgsql:squery(Con, ?SQL_SELECT),
  290. ok = epgsql:close(Con),
  291. Result.
  292. %%------------------------------------------------------------------------------
  293. %% Testcases
  294. %%------------------------------------------------------------------------------
  295. t_setup_via_config_and_publish(Config) ->
  296. ?assertMatch(
  297. {ok, _},
  298. create_bridge(Config)
  299. ),
  300. Val = integer_to_binary(erlang:unique_integer()),
  301. SentData = #{payload => Val, timestamp => 1668602148000},
  302. ?check_trace(
  303. begin
  304. {_, {ok, _}} =
  305. ?wait_async_action(
  306. send_message(Config, SentData),
  307. #{?snk_kind := pgsql_connector_query_return},
  308. 10_000
  309. ),
  310. ?assertMatch(
  311. Val,
  312. connect_and_get_payload(Config)
  313. ),
  314. ok
  315. end,
  316. fun(Trace0) ->
  317. Trace = ?of_kind(pgsql_connector_query_return, Trace0),
  318. case ?config(enable_batch, Config) of
  319. true ->
  320. ?assertMatch([#{result := {_, [{ok, 1}]}}], Trace);
  321. false ->
  322. ?assertMatch([#{result := {ok, 1}}], Trace)
  323. end,
  324. ok
  325. end
  326. ),
  327. ok.
  328. t_setup_via_http_api_and_publish(Config) ->
  329. BridgeType = ?config(pgsql_bridge_type, Config),
  330. Name = ?config(pgsql_name, Config),
  331. PgsqlConfig0 = ?config(pgsql_config, Config),
  332. QueryMode = ?config(query_mode, Config),
  333. PgsqlConfig = PgsqlConfig0#{
  334. <<"name">> => Name,
  335. <<"type">> => BridgeType
  336. },
  337. ?assertMatch(
  338. {ok, _},
  339. create_bridge_http(PgsqlConfig)
  340. ),
  341. Val = integer_to_binary(erlang:unique_integer()),
  342. SentData = #{payload => Val, timestamp => 1668602148000},
  343. ?check_trace(
  344. begin
  345. {Res, {ok, _}} =
  346. ?wait_async_action(
  347. send_message(Config, SentData),
  348. #{?snk_kind := pgsql_connector_query_return},
  349. 10_000
  350. ),
  351. case QueryMode of
  352. async ->
  353. ok;
  354. sync ->
  355. ?assertEqual({ok, 1}, Res)
  356. end,
  357. ?assertMatch(
  358. Val,
  359. connect_and_get_payload(Config)
  360. ),
  361. ok
  362. end,
  363. fun(Trace0) ->
  364. Trace = ?of_kind(pgsql_connector_query_return, Trace0),
  365. case ?config(enable_batch, Config) of
  366. true ->
  367. ?assertMatch([#{result := {_, [{ok, 1}]}}], Trace);
  368. false ->
  369. ?assertMatch([#{result := {ok, 1}}], Trace)
  370. end,
  371. ok
  372. end
  373. ),
  374. ok.
  375. t_get_status(Config) ->
  376. ?assertMatch(
  377. {ok, _},
  378. create_bridge(Config)
  379. ),
  380. ProxyPort = ?config(proxy_port, Config),
  381. ProxyHost = ?config(proxy_host, Config),
  382. ProxyName = ?config(proxy_name, Config),
  383. Name = ?config(pgsql_name, Config),
  384. BridgeType = ?config(pgsql_bridge_type, Config),
  385. ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
  386. ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID)),
  387. emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
  388. ?assertMatch(
  389. {ok, Status} when Status =:= disconnected orelse Status =:= connecting,
  390. emqx_resource_manager:health_check(ResourceID)
  391. )
  392. end),
  393. ok.
  394. t_create_disconnected(Config) ->
  395. ProxyPort = ?config(proxy_port, Config),
  396. ProxyHost = ?config(proxy_host, Config),
  397. ProxyName = ?config(proxy_name, Config),
  398. ?check_trace(
  399. emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
  400. ?assertMatch({ok, _}, create_bridge(Config))
  401. end),
  402. fun(Trace) ->
  403. ?assertMatch(
  404. [#{error := {start_pool_failed, _, _}}],
  405. ?of_kind(pgsql_connector_start_failed, Trace)
  406. ),
  407. ok
  408. end
  409. ),
  410. ok.
  411. t_write_failure(Config) ->
  412. ProxyName = ?config(proxy_name, Config),
  413. ProxyPort = ?config(proxy_port, Config),
  414. ProxyHost = ?config(proxy_host, Config),
  415. QueryMode = ?config(query_mode, Config),
  416. {ok, _} = create_bridge(Config),
  417. Val = integer_to_binary(erlang:unique_integer()),
  418. SentData = #{payload => Val, timestamp => 1668602148000},
  419. ?check_trace(
  420. emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
  421. {_, {ok, _}} =
  422. ?wait_async_action(
  423. case QueryMode of
  424. sync ->
  425. ?assertMatch({error, _}, send_message(Config, SentData));
  426. async ->
  427. send_message(Config, SentData)
  428. end,
  429. #{?snk_kind := buffer_worker_flush_nack},
  430. 1_000
  431. )
  432. end),
  433. fun(Trace0) ->
  434. ct:pal("trace: ~p", [Trace0]),
  435. Trace = ?of_kind(buffer_worker_flush_nack, Trace0),
  436. ?assertMatch([#{result := {error, _}} | _], Trace),
  437. [#{result := {error, Error}} | _] = Trace,
  438. case Error of
  439. {resource_error, _} ->
  440. ok;
  441. {recoverable_error, disconnected} ->
  442. ok;
  443. _ ->
  444. ct:fail("unexpected error: ~p", [Error])
  445. end
  446. end
  447. ),
  448. ok.
  449. % This test doesn't work with batch enabled since it is not possible
  450. % to set the timeout directly for batch queries
  451. t_write_timeout(Config) ->
  452. ProxyName = ?config(proxy_name, Config),
  453. ProxyPort = ?config(proxy_port, Config),
  454. ProxyHost = ?config(proxy_host, Config),
  455. QueryMode = ?config(query_mode, Config),
  456. {ok, _} = create_bridge(
  457. Config,
  458. #{
  459. <<"resource_opts">> => #{
  460. <<"request_timeout">> => 500,
  461. <<"resume_interval">> => 100,
  462. <<"health_check_interval">> => 100
  463. }
  464. }
  465. ),
  466. Val = integer_to_binary(erlang:unique_integer()),
  467. SentData = #{payload => Val, timestamp => 1668602148000},
  468. {ok, SRef} = snabbkaffe:subscribe(
  469. ?match_event(#{?snk_kind := call_query_enter}),
  470. 2_000
  471. ),
  472. Res0 =
  473. emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() ->
  474. Res1 =
  475. case QueryMode of
  476. async ->
  477. query_resource_async(Config, {send_message, SentData});
  478. sync ->
  479. query_resource(Config, {send_message, SentData})
  480. end,
  481. ?assertMatch({ok, [_]}, snabbkaffe:receive_events(SRef)),
  482. Res1
  483. end),
  484. case Res0 of
  485. {_, Ref} when is_reference(Ref) ->
  486. case receive_result(Ref, 15_000) of
  487. {ok, Res} ->
  488. ?assertMatch({error, {unrecoverable_error, _}}, Res);
  489. timeout ->
  490. ct:pal("mailbox:\n ~p", [process_info(self(), messages)]),
  491. ct:fail("no response received")
  492. end;
  493. _ ->
  494. ?assertMatch({error, {resource_error, #{reason := timeout}}}, Res0)
  495. end,
  496. ok.
  497. t_simple_sql_query(Config) ->
  498. EnableBatch = ?config(enable_batch, Config),
  499. QueryMode = ?config(query_mode, Config),
  500. ?assertMatch(
  501. {ok, _},
  502. create_bridge(Config)
  503. ),
  504. Request = {sql, <<"SELECT count(1) AS T">>},
  505. Result =
  506. case QueryMode of
  507. sync ->
  508. query_resource(Config, Request);
  509. async ->
  510. {_, Ref} = query_resource_async(Config, Request),
  511. {ok, Res} = receive_result(Ref, 2_000),
  512. Res
  513. end,
  514. case EnableBatch of
  515. true ->
  516. ?assertEqual({error, {unrecoverable_error, batch_prepare_not_implemented}}, Result);
  517. false ->
  518. ?assertMatch({ok, _, [{1}]}, Result)
  519. end,
  520. ok.
  521. t_missing_data(Config) ->
  522. ?assertMatch(
  523. {ok, _},
  524. create_bridge(Config)
  525. ),
  526. {_, {ok, Event}} =
  527. ?wait_async_action(
  528. send_message(Config, #{}),
  529. #{?snk_kind := buffer_worker_flush_ack},
  530. 2_000
  531. ),
  532. ?assertMatch(
  533. #{
  534. result :=
  535. {error,
  536. {unrecoverable_error, {error, error, <<"23502">>, not_null_violation, _, _}}}
  537. },
  538. Event
  539. ),
  540. ok.
  541. t_bad_sql_parameter(Config) ->
  542. QueryMode = ?config(query_mode, Config),
  543. EnableBatch = ?config(enable_batch, Config),
  544. ?assertMatch(
  545. {ok, _},
  546. create_bridge(Config)
  547. ),
  548. Request = {sql, <<"">>, [bad_parameter]},
  549. Result =
  550. case QueryMode of
  551. sync ->
  552. query_resource(Config, Request);
  553. async ->
  554. {_, Ref} = query_resource_async(Config, Request),
  555. {ok, Res} = receive_result(Ref, 2_000),
  556. Res
  557. end,
  558. case EnableBatch of
  559. true ->
  560. ?assertEqual({error, {unrecoverable_error, invalid_request}}, Result);
  561. false ->
  562. ?assertMatch(
  563. {error, {unrecoverable_error, _}}, Result
  564. )
  565. end,
  566. ok.
  567. t_nasty_sql_string(Config) ->
  568. ?assertMatch({ok, _}, create_bridge(Config)),
  569. Payload = list_to_binary(lists:seq(1, 127)),
  570. Message = #{payload => Payload, timestamp => erlang:system_time(millisecond)},
  571. {_, {ok, _}} =
  572. ?wait_async_action(
  573. send_message(Config, Message),
  574. #{?snk_kind := pgsql_connector_query_return},
  575. 1_000
  576. ),
  577. ?assertEqual(Payload, connect_and_get_payload(Config)).