emqx_bridge_v2_pgsql_SUITE.erl 14 KB


  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %% http://www.apache.org/licenses/LICENSE-2.0
  8. %%
  9. %% Unless required by applicable law or agreed to in writing, software
  10. %% distributed under the License is distributed on an "AS IS" BASIS,
  11. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. %% See the License for the specific language governing permissions and
  13. %% limitations under the License.
  14. %%--------------------------------------------------------------------
  15. -module(emqx_bridge_v2_pgsql_SUITE).
  16. -compile(nowarn_export_all).
  17. -compile(export_all).
  18. -include_lib("eunit/include/eunit.hrl").
  19. -include_lib("common_test/include/ct.hrl").
  20. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  21. -define(BRIDGE_TYPE, pgsql).
  22. -define(BRIDGE_TYPE_BIN, <<"pgsql">>).
  23. -define(CONNECTOR_TYPE, pgsql).
  24. -define(CONNECTOR_TYPE_BIN, <<"pgsql">>).
  25. -import(emqx_common_test_helpers, [on_exit/1]).
  26. -import(emqx_utils_conv, [bin/1]).
  27. %%------------------------------------------------------------------------------
  28. %% CT boilerplate
  29. %%------------------------------------------------------------------------------
  30. all() ->
  31. All0 = emqx_common_test_helpers:all(?MODULE),
  32. All = All0 -- matrix_cases(),
  33. Groups = lists:map(fun({G, _, _}) -> {group, G} end, groups()),
  34. Groups ++ All.
  35. matrix_cases() ->
  36. [
  37. t_disable_prepared_statements
  38. ].
  39. groups() ->
  40. emqx_common_test_helpers:matrix_to_groups(?MODULE, matrix_cases()).
  41. init_per_suite(Config) ->
  42. PostgresHost = os:getenv("PGSQL_TCP_HOST", "toxiproxy"),
  43. PostgresPort = list_to_integer(os:getenv("PGSQL_TCP_PORT", "5432")),
  44. case emqx_common_test_helpers:is_tcp_server_available(PostgresHost, PostgresPort) of
  45. true ->
  46. Apps = emqx_cth_suite:start(
  47. [
  48. emqx,
  49. emqx_conf,
  50. emqx_connector,
  51. emqx_bridge,
  52. emqx_bridge_pgsql,
  53. emqx_rule_engine,
  54. emqx_management,
  55. emqx_mgmt_api_test_util:emqx_dashboard()
  56. ],
  57. #{work_dir => emqx_cth_suite:work_dir(Config)}
  58. ),
  59. NConfig = [
  60. {apps, Apps},
  61. {pgsql_host, PostgresHost},
  62. {pgsql_port, PostgresPort},
  63. {enable_tls, false},
  64. {postgres_host, PostgresHost},
  65. {postgres_port, PostgresPort}
  66. | Config
  67. ],
  68. emqx_bridge_pgsql_SUITE:connect_and_create_table(NConfig),
  69. NConfig;
  70. false ->
  71. case os:getenv("IS_CI") of
  72. "yes" ->
  73. throw(no_postgres);
  74. _ ->
  75. {skip, no_postgres}
  76. end
  77. end.
  78. end_per_suite(Config) ->
  79. Apps = ?config(apps, Config),
  80. emqx_cth_suite:stop(Apps),
  81. ok.
  82. init_per_group(Group, Config) when
  83. Group =:= postgres;
  84. Group =:= timescale;
  85. Group =:= matrix
  86. ->
  87. [
  88. {bridge_type, group_to_type(Group)},
  89. {connector_type, group_to_type(Group)}
  90. | Config
  91. ];
  92. init_per_group(batch_enabled, Config) ->
  93. [
  94. {batch_size, 10},
  95. {batch_time, <<"10ms">>}
  96. | Config
  97. ];
  98. init_per_group(batch_disabled, Config) ->
  99. [
  100. {batch_size, 1},
  101. {batch_time, <<"0ms">>}
  102. | Config
  103. ];
  104. init_per_group(_Group, Config) ->
  105. Config.
  106. group_to_type(postgres) -> pgsql;
  107. group_to_type(Group) -> Group.
  108. end_per_group(_Group, _Config) ->
  109. ok.
  110. init_per_testcase(TestCase, Config) ->
  111. ct:timetrap(timer:seconds(60)),
  112. emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
  113. emqx_config:delete_override_conf_files(),
  114. UniqueNum = integer_to_binary(erlang:unique_integer()),
  115. Name = iolist_to_binary([atom_to_binary(TestCase), UniqueNum]),
  116. Username = <<"root">>,
  117. Password = <<"public">>,
  118. Passfile = filename:join(?config(priv_dir, Config), "passfile"),
  119. ok = file:write_file(Passfile, Password),
  120. NConfig = [
  121. {postgres_username, Username},
  122. {postgres_password, Password},
  123. {postgres_passfile, Passfile}
  124. | Config
  125. ],
  126. ConnectorConfig = connector_config(Name, NConfig),
  127. BridgeConfig = bridge_config(Name, Name),
  128. ok = snabbkaffe:start_trace(),
  129. [
  130. {connector_type, proplists:get_value(connector_type, Config, ?CONNECTOR_TYPE)},
  131. {connector_name, Name},
  132. {connector_config, ConnectorConfig},
  133. {bridge_type, proplists:get_value(bridge_type, Config, ?BRIDGE_TYPE)},
  134. {bridge_name, Name},
  135. {bridge_config, BridgeConfig}
  136. | NConfig
  137. ].
  138. end_per_testcase(_Testcase, Config) ->
  139. case proplists:get_bool(skip_does_not_apply, Config) of
  140. true ->
  141. ok;
  142. false ->
  143. emqx_bridge_pgsql_SUITE:connect_and_clear_table(Config),
  144. emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
  145. emqx_common_test_helpers:call_janitor(60_000),
  146. ok = snabbkaffe:stop(),
  147. ok
  148. end.
  149. %%------------------------------------------------------------------------------
  150. %% Helper fns
  151. %%------------------------------------------------------------------------------
  152. connector_config(Name, Config) ->
  153. PostgresHost = ?config(postgres_host, Config),
  154. PostgresPort = ?config(postgres_port, Config),
  155. Username = ?config(postgres_username, Config),
  156. PassFile = ?config(postgres_passfile, Config),
  157. InnerConfigMap0 =
  158. #{
  159. <<"enable">> => true,
  160. <<"database">> => <<"mqtt">>,
  161. <<"server">> => iolist_to_binary([PostgresHost, ":", integer_to_binary(PostgresPort)]),
  162. <<"pool_size">> => 8,
  163. <<"username">> => Username,
  164. <<"password">> => iolist_to_binary(["file://", PassFile]),
  165. <<"resource_opts">> => #{
  166. <<"health_check_interval">> => <<"15s">>,
  167. <<"start_after_created">> => true,
  168. <<"start_timeout">> => <<"5s">>
  169. }
  170. },
  171. InnerConfigMap = serde_roundtrip(InnerConfigMap0),
  172. emqx_bridge_v2_testlib:parse_and_check_connector(?CONNECTOR_TYPE_BIN, Name, InnerConfigMap).
  173. bridge_config(Name, ConnectorId) ->
  174. InnerConfigMap0 =
  175. #{
  176. <<"enable">> => true,
  177. <<"connector">> => ConnectorId,
  178. <<"parameters">> =>
  179. #{<<"sql">> => emqx_bridge_pgsql_SUITE:default_sql()},
  180. <<"local_topic">> => <<"t/postgres">>,
  181. <<"resource_opts">> => #{
  182. <<"batch_size">> => 1,
  183. <<"batch_time">> => <<"0ms">>,
  184. <<"buffer_mode">> => <<"memory_only">>,
  185. <<"buffer_seg_bytes">> => <<"10MB">>,
  186. <<"health_check_interval">> => <<"15s">>,
  187. <<"inflight_window">> => 100,
  188. <<"max_buffer_bytes">> => <<"256MB">>,
  189. <<"metrics_flush_interval">> => <<"1s">>,
  190. <<"query_mode">> => <<"sync">>,
  191. <<"request_ttl">> => <<"45s">>,
  192. <<"resume_interval">> => <<"15s">>,
  193. <<"worker_pool_size">> => <<"1">>
  194. }
  195. },
  196. InnerConfigMap = serde_roundtrip(InnerConfigMap0),
  197. parse_and_check_bridge_config(InnerConfigMap, Name).
  198. %% check it serializes correctly
  199. serde_roundtrip(InnerConfigMap0) ->
  200. IOList = hocon_pp:do(InnerConfigMap0, #{}),
  201. {ok, InnerConfigMap} = hocon:binary(IOList),
  202. InnerConfigMap.
  203. parse_and_check_bridge_config(InnerConfigMap, Name) ->
  204. emqx_bridge_v2_testlib:parse_and_check(?BRIDGE_TYPE_BIN, Name, InnerConfigMap).
  205. make_message() ->
  206. ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
  207. Payload = emqx_guid:to_hexstr(emqx_guid:gen()),
  208. #{
  209. clientid => ClientId,
  210. payload => Payload,
  211. timestamp => 1668602148000
  212. }.
  213. %%------------------------------------------------------------------------------
  214. %% Testcases
  215. %%------------------------------------------------------------------------------
  216. t_start_stop(Config) ->
  217. emqx_bridge_v2_testlib:t_start_stop(Config, postgres_stopped),
  218. ok.
  219. t_create_via_http(Config) ->
  220. emqx_bridge_v2_testlib:t_create_via_http(Config),
  221. ok.
  222. t_on_get_status(Config) ->
  223. emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}),
  224. ok.
  225. t_sync_query(Config) ->
  226. ok = emqx_bridge_v2_testlib:t_sync_query(
  227. Config,
  228. fun make_message/0,
  229. fun(Res) -> ?assertMatch({ok, _}, Res) end,
  230. postgres_bridge_connector_on_query_return
  231. ),
  232. ok.
  233. t_start_action_or_source_with_disabled_connector(Config) ->
  234. ok = emqx_bridge_v2_testlib:t_start_action_or_source_with_disabled_connector(Config),
  235. ok.
  236. t_disable_prepared_statements(matrix) ->
  237. [
  238. [postgres, batch_disabled],
  239. [postgres, batch_enabled],
  240. [timescale, batch_disabled],
  241. [timescale, batch_enabled],
  242. [matrix, batch_disabled],
  243. [matrix, batch_enabled]
  244. ];
  245. t_disable_prepared_statements(Config0) ->
  246. BatchSize = ?config(batch_size, Config0),
  247. BatchTime = ?config(batch_time, Config0),
  248. ConnectorConfig0 = ?config(connector_config, Config0),
  249. ConnectorConfig = maps:merge(ConnectorConfig0, #{<<"disable_prepared_statements">> => true}),
  250. BridgeConfig0 = ?config(bridge_config, Config0),
  251. BridgeConfig = emqx_utils_maps:deep_merge(
  252. BridgeConfig0,
  253. #{
  254. <<"resource_opts">> => #{
  255. <<"batch_size">> => BatchSize,
  256. <<"batch_time">> => BatchTime,
  257. <<"query_mode">> => <<"async">>
  258. }
  259. }
  260. ),
  261. Config1 = lists:keyreplace(connector_config, 1, Config0, {connector_config, ConnectorConfig}),
  262. Config = lists:keyreplace(bridge_config, 1, Config1, {bridge_config, BridgeConfig}),
  263. ?check_trace(
  264. #{timetrap => 5_000},
  265. begin
  266. ?assertMatch({ok, _}, emqx_bridge_v2_testlib:create_bridge_api(Config)),
  267. RuleTopic = <<"t/postgres">>,
  268. Type = ?config(bridge_type, Config),
  269. {ok, _} = emqx_bridge_v2_testlib:create_rule_and_action_http(Type, RuleTopic, Config),
  270. ResourceId = emqx_bridge_v2_testlib:resource_id(Config),
  271. ?retry(
  272. _Sleep = 1_000,
  273. _Attempts = 20,
  274. ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
  275. ),
  276. {ok, C} = emqtt:start_link(),
  277. {ok, _} = emqtt:connect(C),
  278. lists:foreach(
  279. fun(N) ->
  280. emqtt:publish(C, RuleTopic, integer_to_binary(N))
  281. end,
  282. lists:seq(1, BatchSize)
  283. ),
  284. case BatchSize > 1 of
  285. true ->
  286. ?block_until(#{
  287. ?snk_kind := "postgres_success_batch_result",
  288. row_count := BatchSize
  289. }),
  290. ok;
  291. false ->
  292. ok
  293. end,
  294. ok
  295. end,
  296. []
  297. ),
  298. emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
  299. ok = emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}),
  300. emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
  301. ok = emqx_bridge_v2_testlib:t_create_via_http(Config),
  302. ok.
  303. t_update_with_invalid_prepare(Config) ->
  304. ConnectorName = ?config(connector_name, Config),
  305. BridgeName = ?config(bridge_name, Config),
  306. {ok, _} = emqx_bridge_v2_testlib:create_bridge_api(Config),
  307. %% arrivedx is a bad column name
  308. BadSQL = <<
  309. "INSERT INTO mqtt_test(payload, arrivedx) "
  310. "VALUES (${payload}, TO_TIMESTAMP((${timestamp} :: bigint)/1000))"
  311. >>,
  312. Override = #{<<"parameters">> => #{<<"sql">> => BadSQL}},
  313. {ok, {{_, 200, "OK"}, _Headers1, Body1}} =
  314. emqx_bridge_v2_testlib:update_bridge_api(Config, Override),
  315. ?assertMatch(#{<<"status">> := <<"disconnected">>}, Body1),
  316. Error1 = maps:get(<<"error">>, Body1),
  317. case re:run(Error1, <<"undefined_column">>, [{capture, none}]) of
  318. match ->
  319. ok;
  320. nomatch ->
  321. ct:fail(#{
  322. expected_pattern => "undefined_column",
  323. got => Error1
  324. })
  325. end,
  326. %% assert that although there was an error returned, the invliad SQL is actually put
  327. C1 = [{action_name, BridgeName}, {action_type, pgsql} | Config],
  328. {ok, {{_, 200, "OK"}, _, Action}} = emqx_bridge_v2_testlib:get_action_api(C1),
  329. #{<<"parameters">> := #{<<"sql">> := FetchedSQL}} = Action,
  330. ?assertEqual(FetchedSQL, BadSQL),
  331. %% update again with the original sql
  332. {ok, {{_, 200, "OK"}, _Headers2, Body2}} =
  333. emqx_bridge_v2_testlib:update_bridge_api(Config, #{}),
  334. %% the error should be gone now, and status should be 'connected'
  335. ?assertMatch(#{<<"error">> := <<>>, <<"status">> := <<"connected">>}, Body2),
  336. %% finally check if ecpool worker should have exactly one of reconnect callback
  337. ConnectorResId = <<"connector:pgsql:", ConnectorName/binary>>,
  338. Workers = ecpool:workers(ConnectorResId),
  339. [_ | _] = WorkerPids = lists:map(fun({_, Pid}) -> Pid end, Workers),
  340. lists:foreach(
  341. fun(Pid) ->
  342. [{emqx_postgresql, prepare_sql_to_conn, Args}] =
  343. ecpool_worker:get_reconnect_callbacks(Pid),
  344. Sig = emqx_postgresql:get_reconnect_callback_signature(Args),
  345. BridgeResId = <<"action:pgsql:", BridgeName/binary, $:, ConnectorResId/binary>>,
  346. ?assertEqual(BridgeResId, Sig)
  347. end,
  348. WorkerPids
  349. ),
  350. ok.