emqx_bridge_v2_pgsql_SUITE.erl 12 KB


  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %% http://www.apache.org/licenses/LICENSE-2.0
  8. %%
  9. %% Unless required by applicable law or agreed to in writing, software
  10. %% distributed under the License is distributed on an "AS IS" BASIS,
  11. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. %% See the License for the specific language governing permissions and
  13. %% limitations under the License.
  14. %%--------------------------------------------------------------------
  15. -module(emqx_bridge_v2_pgsql_SUITE).
  16. -compile(nowarn_export_all).
  17. -compile(export_all).
  18. -include_lib("eunit/include/eunit.hrl").
  19. -include_lib("common_test/include/ct.hrl").
  20. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  21. -define(BRIDGE_TYPE, pgsql).
  22. -define(BRIDGE_TYPE_BIN, <<"pgsql">>).
  23. -define(CONNECTOR_TYPE, pgsql).
  24. -define(CONNECTOR_TYPE_BIN, <<"pgsql">>).
  25. -import(emqx_common_test_helpers, [on_exit/1]).
  26. -import(emqx_utils_conv, [bin/1]).
  27. %%------------------------------------------------------------------------------
  28. %% CT boilerplate
  29. %%------------------------------------------------------------------------------
  30. all() ->
  31. All0 = emqx_common_test_helpers:all(?MODULE),
  32. All = All0 -- matrix_cases(),
  33. Groups = lists:map(fun({G, _, _}) -> {group, G} end, groups()),
  34. Groups ++ All.
  35. matrix_cases() ->
  36. [
  37. t_disable_prepared_statements
  38. ].
  39. groups() ->
  40. emqx_common_test_helpers:matrix_to_groups(?MODULE, matrix_cases()).
  41. init_per_suite(Config) ->
  42. PostgresHost = os:getenv("PGSQL_TCP_HOST", "toxiproxy"),
  43. PostgresPort = list_to_integer(os:getenv("PGSQL_TCP_PORT", "5432")),
  44. case emqx_common_test_helpers:is_tcp_server_available(PostgresHost, PostgresPort) of
  45. true ->
  46. Apps = emqx_cth_suite:start(
  47. [
  48. emqx,
  49. emqx_conf,
  50. emqx_connector,
  51. emqx_bridge,
  52. emqx_bridge_pgsql,
  53. emqx_rule_engine,
  54. emqx_management,
  55. emqx_mgmt_api_test_util:emqx_dashboard()
  56. ],
  57. #{work_dir => emqx_cth_suite:work_dir(Config)}
  58. ),
  59. NConfig = [
  60. {apps, Apps},
  61. {pgsql_host, PostgresHost},
  62. {pgsql_port, PostgresPort},
  63. {enable_tls, false},
  64. {postgres_host, PostgresHost},
  65. {postgres_port, PostgresPort}
  66. | Config
  67. ],
  68. emqx_bridge_pgsql_SUITE:connect_and_create_table(NConfig),
  69. NConfig;
  70. false ->
  71. case os:getenv("IS_CI") of
  72. "yes" ->
  73. throw(no_postgres);
  74. _ ->
  75. {skip, no_postgres}
  76. end
  77. end.
  78. end_per_suite(Config) ->
  79. Apps = ?config(apps, Config),
  80. emqx_cth_suite:stop(Apps),
  81. ok.
  82. init_per_group(Group, Config) when
  83. Group =:= postgres;
  84. Group =:= timescale;
  85. Group =:= matrix
  86. ->
  87. [
  88. {bridge_type, group_to_type(Group)},
  89. {connector_type, group_to_type(Group)}
  90. | Config
  91. ];
  92. init_per_group(batch_enabled, Config) ->
  93. [
  94. {batch_size, 10},
  95. {batch_time, <<"10ms">>}
  96. | Config
  97. ];
  98. init_per_group(batch_disabled, Config) ->
  99. [
  100. {batch_size, 1},
  101. {batch_time, <<"0ms">>}
  102. | Config
  103. ];
  104. init_per_group(_Group, Config) ->
  105. Config.
  106. group_to_type(postgres) -> pgsql;
  107. group_to_type(Group) -> Group.
  108. end_per_group(_Group, _Config) ->
  109. ok.
  110. init_per_testcase(TestCase, Config) ->
  111. ct:timetrap(timer:seconds(60)),
  112. emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
  113. emqx_config:delete_override_conf_files(),
  114. UniqueNum = integer_to_binary(erlang:unique_integer()),
  115. Name = iolist_to_binary([atom_to_binary(TestCase), UniqueNum]),
  116. Username = <<"root">>,
  117. Password = <<"public">>,
  118. Passfile = filename:join(?config(priv_dir, Config), "passfile"),
  119. ok = file:write_file(Passfile, Password),
  120. NConfig = [
  121. {postgres_username, Username},
  122. {postgres_password, Password},
  123. {postgres_passfile, Passfile}
  124. | Config
  125. ],
  126. ConnectorConfig = connector_config(Name, NConfig),
  127. BridgeConfig = bridge_config(Name, Name),
  128. ok = snabbkaffe:start_trace(),
  129. [
  130. {connector_type, proplists:get_value(connector_type, Config, ?CONNECTOR_TYPE)},
  131. {connector_name, Name},
  132. {connector_config, ConnectorConfig},
  133. {bridge_type, proplists:get_value(bridge_type, Config, ?BRIDGE_TYPE)},
  134. {bridge_name, Name},
  135. {bridge_config, BridgeConfig}
  136. | NConfig
  137. ].
  138. end_per_testcase(_Testcase, Config) ->
  139. case proplists:get_bool(skip_does_not_apply, Config) of
  140. true ->
  141. ok;
  142. false ->
  143. emqx_bridge_pgsql_SUITE:connect_and_clear_table(Config),
  144. emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
  145. emqx_common_test_helpers:call_janitor(60_000),
  146. ok = snabbkaffe:stop(),
  147. ok
  148. end.
  149. %%------------------------------------------------------------------------------
  150. %% Helper fns
  151. %%------------------------------------------------------------------------------
  152. connector_config(Name, Config) ->
  153. PostgresHost = ?config(postgres_host, Config),
  154. PostgresPort = ?config(postgres_port, Config),
  155. Username = ?config(postgres_username, Config),
  156. PassFile = ?config(postgres_passfile, Config),
  157. InnerConfigMap0 =
  158. #{
  159. <<"enable">> => true,
  160. <<"database">> => <<"mqtt">>,
  161. <<"server">> => iolist_to_binary([PostgresHost, ":", integer_to_binary(PostgresPort)]),
  162. <<"pool_size">> => 8,
  163. <<"username">> => Username,
  164. <<"password">> => iolist_to_binary(["file://", PassFile]),
  165. <<"resource_opts">> => #{
  166. <<"health_check_interval">> => <<"15s">>,
  167. <<"start_after_created">> => true,
  168. <<"start_timeout">> => <<"5s">>
  169. }
  170. },
  171. InnerConfigMap = serde_roundtrip(InnerConfigMap0),
  172. parse_and_check_connector_config(InnerConfigMap, Name).
  173. parse_and_check_connector_config(InnerConfigMap, Name) ->
  174. TypeBin = ?CONNECTOR_TYPE_BIN,
  175. RawConf = #{<<"connectors">> => #{TypeBin => #{Name => InnerConfigMap}}},
  176. #{<<"connectors">> := #{TypeBin := #{Name := Config}}} =
  177. hocon_tconf:check_plain(emqx_connector_schema, RawConf, #{
  178. required => false, atom_key => false
  179. }),
  180. ct:pal("parsed config: ~p", [Config]),
  181. InnerConfigMap.
  182. bridge_config(Name, ConnectorId) ->
  183. InnerConfigMap0 =
  184. #{
  185. <<"enable">> => true,
  186. <<"connector">> => ConnectorId,
  187. <<"parameters">> =>
  188. #{<<"sql">> => emqx_bridge_pgsql_SUITE:default_sql()},
  189. <<"local_topic">> => <<"t/postgres">>,
  190. <<"resource_opts">> => #{
  191. <<"batch_size">> => 1,
  192. <<"batch_time">> => <<"0ms">>,
  193. <<"buffer_mode">> => <<"memory_only">>,
  194. <<"buffer_seg_bytes">> => <<"10MB">>,
  195. <<"health_check_interval">> => <<"15s">>,
  196. <<"inflight_window">> => 100,
  197. <<"max_buffer_bytes">> => <<"256MB">>,
  198. <<"metrics_flush_interval">> => <<"1s">>,
  199. <<"query_mode">> => <<"sync">>,
  200. <<"request_ttl">> => <<"45s">>,
  201. <<"resume_interval">> => <<"15s">>,
  202. <<"worker_pool_size">> => <<"1">>
  203. }
  204. },
  205. InnerConfigMap = serde_roundtrip(InnerConfigMap0),
  206. parse_and_check_bridge_config(InnerConfigMap, Name).
  207. %% check it serializes correctly
  208. serde_roundtrip(InnerConfigMap0) ->
  209. IOList = hocon_pp:do(InnerConfigMap0, #{}),
  210. {ok, InnerConfigMap} = hocon:binary(IOList),
  211. InnerConfigMap.
  212. parse_and_check_bridge_config(InnerConfigMap, Name) ->
  213. emqx_bridge_v2_testlib:parse_and_check(?BRIDGE_TYPE_BIN, Name, InnerConfigMap).
  214. make_message() ->
  215. ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
  216. Payload = emqx_guid:to_hexstr(emqx_guid:gen()),
  217. #{
  218. clientid => ClientId,
  219. payload => Payload,
  220. timestamp => 1668602148000
  221. }.
  222. %%------------------------------------------------------------------------------
  223. %% Testcases
  224. %%------------------------------------------------------------------------------
  225. t_start_stop(Config) ->
  226. emqx_bridge_v2_testlib:t_start_stop(Config, postgres_stopped),
  227. ok.
  228. t_create_via_http(Config) ->
  229. emqx_bridge_v2_testlib:t_create_via_http(Config),
  230. ok.
  231. t_on_get_status(Config) ->
  232. emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}),
  233. ok.
  234. t_sync_query(Config) ->
  235. ok = emqx_bridge_v2_testlib:t_sync_query(
  236. Config,
  237. fun make_message/0,
  238. fun(Res) -> ?assertMatch({ok, _}, Res) end,
  239. postgres_bridge_connector_on_query_return
  240. ),
  241. ok.
  242. t_start_action_or_source_with_disabled_connector(Config) ->
  243. ok = emqx_bridge_v2_testlib:t_start_action_or_source_with_disabled_connector(Config),
  244. ok.
  245. t_disable_prepared_statements(matrix) ->
  246. [
  247. [postgres, batch_disabled],
  248. [postgres, batch_enabled],
  249. [timescale, batch_disabled],
  250. [timescale, batch_enabled],
  251. [matrix, batch_disabled],
  252. [matrix, batch_enabled]
  253. ];
  254. t_disable_prepared_statements(Config0) ->
  255. BatchSize = ?config(batch_size, Config0),
  256. BatchTime = ?config(batch_time, Config0),
  257. ConnectorConfig0 = ?config(connector_config, Config0),
  258. ConnectorConfig = maps:merge(ConnectorConfig0, #{<<"disable_prepared_statements">> => true}),
  259. BridgeConfig0 = ?config(bridge_config, Config0),
  260. BridgeConfig = emqx_utils_maps:deep_merge(
  261. BridgeConfig0,
  262. #{
  263. <<"resource_opts">> => #{
  264. <<"batch_size">> => BatchSize,
  265. <<"batch_time">> => BatchTime,
  266. <<"query_mode">> => <<"async">>
  267. }
  268. }
  269. ),
  270. Config1 = lists:keyreplace(connector_config, 1, Config0, {connector_config, ConnectorConfig}),
  271. Config = lists:keyreplace(bridge_config, 1, Config1, {bridge_config, BridgeConfig}),
  272. ?check_trace(
  273. #{timetrap => 5_000},
  274. begin
  275. ?assertMatch({ok, _}, emqx_bridge_v2_testlib:create_bridge_api(Config)),
  276. RuleTopic = <<"t/postgres">>,
  277. Type = ?config(bridge_type, Config),
  278. {ok, _} = emqx_bridge_v2_testlib:create_rule_and_action_http(Type, RuleTopic, Config),
  279. ResourceId = emqx_bridge_v2_testlib:resource_id(Config),
  280. ?retry(
  281. _Sleep = 1_000,
  282. _Attempts = 20,
  283. ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
  284. ),
  285. {ok, C} = emqtt:start_link(),
  286. {ok, _} = emqtt:connect(C),
  287. lists:foreach(
  288. fun(N) ->
  289. emqtt:publish(C, RuleTopic, integer_to_binary(N))
  290. end,
  291. lists:seq(1, BatchSize)
  292. ),
  293. case BatchSize > 1 of
  294. true ->
  295. ?block_until(#{
  296. ?snk_kind := "postgres_success_batch_result",
  297. row_count := BatchSize
  298. }),
  299. ok;
  300. false ->
  301. ok
  302. end,
  303. ok
  304. end,
  305. []
  306. ),
  307. emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
  308. ok = emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}),
  309. emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
  310. ok = emqx_bridge_v2_testlib:t_create_via_http(Config),
  311. ok.