emqx_bridge_v2_pgsql_SUITE.erl 12 KB


  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %% http://www.apache.org/licenses/LICENSE-2.0
  8. %%
  9. %% Unless required by applicable law or agreed to in writing, software
  10. %% distributed under the License is distributed on an "AS IS" BASIS,
  11. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. %% See the License for the specific language governing permissions and
  13. %% limitations under the License.
  14. %%--------------------------------------------------------------------
  15. -module(emqx_bridge_v2_pgsql_SUITE).
  16. -compile(nowarn_export_all).
  17. -compile(export_all).
  18. -include_lib("eunit/include/eunit.hrl").
  19. -include_lib("common_test/include/ct.hrl").
  20. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  21. -define(BRIDGE_TYPE, pgsql).
  22. -define(BRIDGE_TYPE_BIN, <<"pgsql">>).
  23. -define(CONNECTOR_TYPE, pgsql).
  24. -define(CONNECTOR_TYPE_BIN, <<"pgsql">>).
  25. -import(emqx_common_test_helpers, [on_exit/1]).
  26. -import(emqx_utils_conv, [bin/1]).
  27. %%------------------------------------------------------------------------------
  28. %% CT boilerplate
  29. %%------------------------------------------------------------------------------
  30. all() ->
  31. All0 = emqx_common_test_helpers:all(?MODULE),
  32. All = All0 -- matrix_cases(),
  33. Groups = lists:map(fun({G, _, _}) -> {group, G} end, groups()),
  34. Groups ++ All.
  35. matrix_cases() ->
  36. [
  37. t_disable_prepared_statements
  38. ].
  39. groups() ->
  40. emqx_common_test_helpers:matrix_to_groups(?MODULE, matrix_cases()).
  41. init_per_suite(Config) ->
  42. PostgresHost = os:getenv("PGSQL_TCP_HOST", "toxiproxy"),
  43. PostgresPort = list_to_integer(os:getenv("PGSQL_TCP_PORT", "5432")),
  44. case emqx_common_test_helpers:is_tcp_server_available(PostgresHost, PostgresPort) of
  45. true ->
  46. Apps = emqx_cth_suite:start(
  47. [
  48. emqx,
  49. emqx_conf,
  50. emqx_connector,
  51. emqx_bridge,
  52. emqx_bridge_pgsql,
  53. emqx_rule_engine,
  54. emqx_management,
  55. {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
  56. ],
  57. #{work_dir => emqx_cth_suite:work_dir(Config)}
  58. ),
  59. {ok, Api} = emqx_common_test_http:create_default_app(),
  60. NConfig = [
  61. {apps, Apps},
  62. {api, Api},
  63. {pgsql_host, PostgresHost},
  64. {pgsql_port, PostgresPort},
  65. {enable_tls, false},
  66. {postgres_host, PostgresHost},
  67. {postgres_port, PostgresPort}
  68. | Config
  69. ],
  70. emqx_bridge_pgsql_SUITE:connect_and_create_table(NConfig),
  71. NConfig;
  72. false ->
  73. case os:getenv("IS_CI") of
  74. "yes" ->
  75. throw(no_postgres);
  76. _ ->
  77. {skip, no_postgres}
  78. end
  79. end.
  80. end_per_suite(Config) ->
  81. Apps = ?config(apps, Config),
  82. emqx_cth_suite:stop(Apps),
  83. ok.
  84. init_per_group(Group, Config) when
  85. Group =:= postgres;
  86. Group =:= timescale;
  87. Group =:= matrix
  88. ->
  89. [
  90. {bridge_type, group_to_type(Group)},
  91. {connector_type, group_to_type(Group)}
  92. | Config
  93. ];
  94. init_per_group(batch_enabled, Config) ->
  95. [
  96. {batch_size, 10},
  97. {batch_time, <<"10ms">>}
  98. | Config
  99. ];
  100. init_per_group(batch_disabled, Config) ->
  101. [
  102. {batch_size, 1},
  103. {batch_time, <<"0ms">>}
  104. | Config
  105. ];
  106. init_per_group(_Group, Config) ->
  107. Config.
  108. group_to_type(postgres) -> pgsql;
  109. group_to_type(Group) -> Group.
  110. end_per_group(_Group, _Config) ->
  111. ok.
  112. init_per_testcase(TestCase, Config) ->
  113. ct:timetrap(timer:seconds(60)),
  114. emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
  115. emqx_config:delete_override_conf_files(),
  116. UniqueNum = integer_to_binary(erlang:unique_integer()),
  117. Name = iolist_to_binary([atom_to_binary(TestCase), UniqueNum]),
  118. Username = <<"root">>,
  119. Password = <<"public">>,
  120. Passfile = filename:join(?config(priv_dir, Config), "passfile"),
  121. ok = file:write_file(Passfile, Password),
  122. NConfig = [
  123. {postgres_username, Username},
  124. {postgres_password, Password},
  125. {postgres_passfile, Passfile}
  126. | Config
  127. ],
  128. ConnectorConfig = connector_config(Name, NConfig),
  129. BridgeConfig = bridge_config(Name, Name),
  130. ok = snabbkaffe:start_trace(),
  131. [
  132. {connector_type, proplists:get_value(connector_type, Config, ?CONNECTOR_TYPE)},
  133. {connector_name, Name},
  134. {connector_config, ConnectorConfig},
  135. {bridge_type, proplists:get_value(bridge_type, Config, ?BRIDGE_TYPE)},
  136. {bridge_name, Name},
  137. {bridge_config, BridgeConfig}
  138. | NConfig
  139. ].
  140. end_per_testcase(_Testcase, Config) ->
  141. case proplists:get_bool(skip_does_not_apply, Config) of
  142. true ->
  143. ok;
  144. false ->
  145. emqx_bridge_pgsql_SUITE:connect_and_clear_table(Config),
  146. emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
  147. emqx_common_test_helpers:call_janitor(60_000),
  148. ok = snabbkaffe:stop(),
  149. ok
  150. end.
  151. %%------------------------------------------------------------------------------
  152. %% Helper fns
  153. %%------------------------------------------------------------------------------
  154. connector_config(Name, Config) ->
  155. PostgresHost = ?config(postgres_host, Config),
  156. PostgresPort = ?config(postgres_port, Config),
  157. Username = ?config(postgres_username, Config),
  158. PassFile = ?config(postgres_passfile, Config),
  159. InnerConfigMap0 =
  160. #{
  161. <<"enable">> => true,
  162. <<"database">> => <<"mqtt">>,
  163. <<"server">> => iolist_to_binary([PostgresHost, ":", integer_to_binary(PostgresPort)]),
  164. <<"pool_size">> => 8,
  165. <<"username">> => Username,
  166. <<"password">> => iolist_to_binary(["file://", PassFile]),
  167. <<"resource_opts">> => #{
  168. <<"health_check_interval">> => <<"15s">>,
  169. <<"start_after_created">> => true,
  170. <<"start_timeout">> => <<"5s">>
  171. }
  172. },
  173. InnerConfigMap = serde_roundtrip(InnerConfigMap0),
  174. parse_and_check_connector_config(InnerConfigMap, Name).
  175. parse_and_check_connector_config(InnerConfigMap, Name) ->
  176. TypeBin = ?CONNECTOR_TYPE_BIN,
  177. RawConf = #{<<"connectors">> => #{TypeBin => #{Name => InnerConfigMap}}},
  178. #{<<"connectors">> := #{TypeBin := #{Name := Config}}} =
  179. hocon_tconf:check_plain(emqx_connector_schema, RawConf, #{
  180. required => false, atom_key => false
  181. }),
  182. ct:pal("parsed config: ~p", [Config]),
  183. InnerConfigMap.
  184. bridge_config(Name, ConnectorId) ->
  185. InnerConfigMap0 =
  186. #{
  187. <<"enable">> => true,
  188. <<"connector">> => ConnectorId,
  189. <<"parameters">> =>
  190. #{<<"sql">> => emqx_bridge_pgsql_SUITE:default_sql()},
  191. <<"local_topic">> => <<"t/postgres">>,
  192. <<"resource_opts">> => #{
  193. <<"batch_size">> => 1,
  194. <<"batch_time">> => <<"0ms">>,
  195. <<"buffer_mode">> => <<"memory_only">>,
  196. <<"buffer_seg_bytes">> => <<"10MB">>,
  197. <<"health_check_interval">> => <<"15s">>,
  198. <<"inflight_window">> => 100,
  199. <<"max_buffer_bytes">> => <<"256MB">>,
  200. <<"metrics_flush_interval">> => <<"1s">>,
  201. <<"query_mode">> => <<"sync">>,
  202. <<"request_ttl">> => <<"45s">>,
  203. <<"resume_interval">> => <<"15s">>,
  204. <<"worker_pool_size">> => <<"1">>
  205. }
  206. },
  207. InnerConfigMap = serde_roundtrip(InnerConfigMap0),
  208. parse_and_check_bridge_config(InnerConfigMap, Name).
  209. %% check it serializes correctly
  210. serde_roundtrip(InnerConfigMap0) ->
  211. IOList = hocon_pp:do(InnerConfigMap0, #{}),
  212. {ok, InnerConfigMap} = hocon:binary(IOList),
  213. InnerConfigMap.
  214. parse_and_check_bridge_config(InnerConfigMap, Name) ->
  215. emqx_bridge_v2_testlib:parse_and_check(?BRIDGE_TYPE_BIN, Name, InnerConfigMap).
  216. make_message() ->
  217. ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
  218. Payload = emqx_guid:to_hexstr(emqx_guid:gen()),
  219. #{
  220. clientid => ClientId,
  221. payload => Payload,
  222. timestamp => 1668602148000
  223. }.
  224. %%------------------------------------------------------------------------------
  225. %% Testcases
  226. %%------------------------------------------------------------------------------
  227. t_start_stop(Config) ->
  228. emqx_bridge_v2_testlib:t_start_stop(Config, postgres_stopped),
  229. ok.
  230. t_create_via_http(Config) ->
  231. emqx_bridge_v2_testlib:t_create_via_http(Config),
  232. ok.
  233. t_on_get_status(Config) ->
  234. emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}),
  235. ok.
  236. t_sync_query(Config) ->
  237. ok = emqx_bridge_v2_testlib:t_sync_query(
  238. Config,
  239. fun make_message/0,
  240. fun(Res) -> ?assertMatch({ok, _}, Res) end,
  241. postgres_bridge_connector_on_query_return
  242. ),
  243. ok.
  244. t_start_action_or_source_with_disabled_connector(Config) ->
  245. ok = emqx_bridge_v2_testlib:t_start_action_or_source_with_disabled_connector(Config),
  246. ok.
  247. t_disable_prepared_statements(matrix) ->
  248. [
  249. [postgres, batch_disabled],
  250. [postgres, batch_enabled],
  251. [timescale, batch_disabled],
  252. [timescale, batch_enabled],
  253. [matrix, batch_disabled],
  254. [matrix, batch_enabled]
  255. ];
  256. t_disable_prepared_statements(Config0) ->
  257. BatchSize = ?config(batch_size, Config0),
  258. BatchTime = ?config(batch_time, Config0),
  259. ConnectorConfig0 = ?config(connector_config, Config0),
  260. ConnectorConfig = maps:merge(ConnectorConfig0, #{<<"disable_prepared_statements">> => true}),
  261. BridgeConfig0 = ?config(bridge_config, Config0),
  262. BridgeConfig = emqx_utils_maps:deep_merge(
  263. BridgeConfig0,
  264. #{
  265. <<"resource_opts">> => #{
  266. <<"batch_size">> => BatchSize,
  267. <<"batch_time">> => BatchTime,
  268. <<"query_mode">> => <<"async">>
  269. }
  270. }
  271. ),
  272. Config1 = lists:keyreplace(connector_config, 1, Config0, {connector_config, ConnectorConfig}),
  273. Config = lists:keyreplace(bridge_config, 1, Config1, {bridge_config, BridgeConfig}),
  274. ?check_trace(
  275. #{timetrap => 5_000},
  276. begin
  277. ?assertMatch({ok, _}, emqx_bridge_v2_testlib:create_bridge_api(Config)),
  278. RuleTopic = <<"t/postgres">>,
  279. Type = ?config(bridge_type, Config),
  280. {ok, _} = emqx_bridge_v2_testlib:create_rule_and_action_http(Type, RuleTopic, Config),
  281. ResourceId = emqx_bridge_v2_testlib:resource_id(Config),
  282. ?retry(
  283. _Sleep = 1_000,
  284. _Attempts = 20,
  285. ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
  286. ),
  287. {ok, C} = emqtt:start_link(),
  288. {ok, _} = emqtt:connect(C),
  289. lists:foreach(
  290. fun(N) ->
  291. emqtt:publish(C, RuleTopic, integer_to_binary(N))
  292. end,
  293. lists:seq(1, BatchSize)
  294. ),
  295. case BatchSize > 1 of
  296. true ->
  297. ?block_until(#{
  298. ?snk_kind := "postgres_success_batch_result",
  299. row_count := BatchSize
  300. }),
  301. ok;
  302. false ->
  303. ok
  304. end,
  305. ok
  306. end,
  307. []
  308. ),
  309. emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
  310. ok = emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}),
  311. emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
  312. ok = emqx_bridge_v2_testlib:t_create_via_http(Config),
  313. ok.