emqx_bridge_v2_pgsql_SUITE.erl 9.8 KB


  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %% http://www.apache.org/licenses/LICENSE-2.0
  8. %%
  9. %% Unless required by applicable law or agreed to in writing, software
  10. %% distributed under the License is distributed on an "AS IS" BASIS,
  11. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. %% See the License for the specific language governing permissions and
  13. %% limitations under the License.
  14. %%--------------------------------------------------------------------
  15. -module(emqx_bridge_v2_pgsql_SUITE).
  16. -compile(nowarn_export_all).
  17. -compile(export_all).
  18. -include_lib("eunit/include/eunit.hrl").
  19. -include_lib("common_test/include/ct.hrl").
  20. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  21. -define(BRIDGE_TYPE, pgsql).
  22. -define(BRIDGE_TYPE_BIN, <<"pgsql">>).
  23. -define(CONNECTOR_TYPE, pgsql).
  24. -define(CONNECTOR_TYPE_BIN, <<"pgsql">>).
  25. -import(emqx_common_test_helpers, [on_exit/1]).
  26. -import(emqx_utils_conv, [bin/1]).
  27. %%------------------------------------------------------------------------------
  28. %% CT boilerplate
  29. %%------------------------------------------------------------------------------
  30. all() ->
  31. All0 = emqx_common_test_helpers:all(?MODULE),
  32. All = All0 -- matrix_cases(),
  33. Groups = lists:map(fun({G, _, _}) -> {group, G} end, groups()),
  34. Groups ++ All.
  35. matrix_cases() ->
  36. [
  37. t_disable_prepared_statements
  38. ].
  39. groups() ->
  40. emqx_common_test_helpers:matrix_to_groups(?MODULE, matrix_cases()).
  41. init_per_suite(Config) ->
  42. PostgresHost = os:getenv("PGSQL_TCP_HOST", "toxiproxy"),
  43. PostgresPort = list_to_integer(os:getenv("PGSQL_TCP_PORT", "5432")),
  44. case emqx_common_test_helpers:is_tcp_server_available(PostgresHost, PostgresPort) of
  45. true ->
  46. Apps = emqx_cth_suite:start(
  47. [
  48. emqx,
  49. emqx_conf,
  50. emqx_connector,
  51. emqx_bridge,
  52. emqx_bridge_pgsql,
  53. emqx_rule_engine,
  54. emqx_management,
  55. {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
  56. ],
  57. #{work_dir => emqx_cth_suite:work_dir(Config)}
  58. ),
  59. {ok, Api} = emqx_common_test_http:create_default_app(),
  60. NConfig = [
  61. {apps, Apps},
  62. {api, Api},
  63. {pgsql_host, PostgresHost},
  64. {pgsql_port, PostgresPort},
  65. {enable_tls, false},
  66. {postgres_host, PostgresHost},
  67. {postgres_port, PostgresPort}
  68. | Config
  69. ],
  70. emqx_bridge_pgsql_SUITE:connect_and_create_table(NConfig),
  71. NConfig;
  72. false ->
  73. case os:getenv("IS_CI") of
  74. "yes" ->
  75. throw(no_postgres);
  76. _ ->
  77. {skip, no_postgres}
  78. end
  79. end.
  80. end_per_suite(Config) ->
  81. Apps = ?config(apps, Config),
  82. emqx_cth_suite:stop(Apps),
  83. ok.
  84. init_per_group(Group, Config) when
  85. Group =:= postgres;
  86. Group =:= timescale;
  87. Group =:= matrix
  88. ->
  89. [
  90. {bridge_type, group_to_type(Group)},
  91. {connector_type, group_to_type(Group)}
  92. | Config
  93. ];
  94. init_per_group(_Group, Config) ->
  95. Config.
  96. group_to_type(postgres) -> pgsql;
  97. group_to_type(Group) -> Group.
  98. end_per_group(_Group, _Config) ->
  99. ok.
  100. init_per_testcase(TestCase, Config) ->
  101. ct:timetrap(timer:seconds(60)),
  102. emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
  103. emqx_config:delete_override_conf_files(),
  104. UniqueNum = integer_to_binary(erlang:unique_integer()),
  105. Name = iolist_to_binary([atom_to_binary(TestCase), UniqueNum]),
  106. Username = <<"root">>,
  107. Password = <<"public">>,
  108. Passfile = filename:join(?config(priv_dir, Config), "passfile"),
  109. ok = file:write_file(Passfile, Password),
  110. NConfig = [
  111. {postgres_username, Username},
  112. {postgres_password, Password},
  113. {postgres_passfile, Passfile}
  114. | Config
  115. ],
  116. ConnectorConfig = connector_config(Name, NConfig),
  117. BridgeConfig = bridge_config(Name, Name),
  118. ok = snabbkaffe:start_trace(),
  119. [
  120. {connector_type, proplists:get_value(connector_type, Config, ?CONNECTOR_TYPE)},
  121. {connector_name, Name},
  122. {connector_config, ConnectorConfig},
  123. {bridge_type, proplists:get_value(bridge_type, Config, ?BRIDGE_TYPE)},
  124. {bridge_name, Name},
  125. {bridge_config, BridgeConfig}
  126. | NConfig
  127. ].
  128. end_per_testcase(_Testcase, Config) ->
  129. case proplists:get_bool(skip_does_not_apply, Config) of
  130. true ->
  131. ok;
  132. false ->
  133. emqx_bridge_pgsql_SUITE:connect_and_clear_table(Config),
  134. emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
  135. emqx_common_test_helpers:call_janitor(60_000),
  136. ok = snabbkaffe:stop(),
  137. ok
  138. end.
  139. %%------------------------------------------------------------------------------
  140. %% Helper fns
  141. %%------------------------------------------------------------------------------
  142. connector_config(Name, Config) ->
  143. PostgresHost = ?config(postgres_host, Config),
  144. PostgresPort = ?config(postgres_port, Config),
  145. Username = ?config(postgres_username, Config),
  146. PassFile = ?config(postgres_passfile, Config),
  147. InnerConfigMap0 =
  148. #{
  149. <<"enable">> => true,
  150. <<"database">> => <<"mqtt">>,
  151. <<"server">> => iolist_to_binary([PostgresHost, ":", integer_to_binary(PostgresPort)]),
  152. <<"pool_size">> => 8,
  153. <<"username">> => Username,
  154. <<"password">> => iolist_to_binary(["file://", PassFile]),
  155. <<"resource_opts">> => #{
  156. <<"health_check_interval">> => <<"15s">>,
  157. <<"start_after_created">> => true,
  158. <<"start_timeout">> => <<"5s">>
  159. }
  160. },
  161. InnerConfigMap = serde_roundtrip(InnerConfigMap0),
  162. parse_and_check_connector_config(InnerConfigMap, Name).
  163. parse_and_check_connector_config(InnerConfigMap, Name) ->
  164. TypeBin = ?CONNECTOR_TYPE_BIN,
  165. RawConf = #{<<"connectors">> => #{TypeBin => #{Name => InnerConfigMap}}},
  166. #{<<"connectors">> := #{TypeBin := #{Name := Config}}} =
  167. hocon_tconf:check_plain(emqx_connector_schema, RawConf, #{
  168. required => false, atom_key => false
  169. }),
  170. ct:pal("parsed config: ~p", [Config]),
  171. InnerConfigMap.
  172. bridge_config(Name, ConnectorId) ->
  173. InnerConfigMap0 =
  174. #{
  175. <<"enable">> => true,
  176. <<"connector">> => ConnectorId,
  177. <<"parameters">> =>
  178. #{<<"sql">> => emqx_bridge_pgsql_SUITE:default_sql()},
  179. <<"local_topic">> => <<"t/postgres">>,
  180. <<"resource_opts">> => #{
  181. <<"batch_size">> => 1,
  182. <<"batch_time">> => <<"0ms">>,
  183. <<"buffer_mode">> => <<"memory_only">>,
  184. <<"buffer_seg_bytes">> => <<"10MB">>,
  185. <<"health_check_interval">> => <<"15s">>,
  186. <<"inflight_window">> => 100,
  187. <<"max_buffer_bytes">> => <<"256MB">>,
  188. <<"metrics_flush_interval">> => <<"1s">>,
  189. <<"query_mode">> => <<"sync">>,
  190. <<"request_ttl">> => <<"45s">>,
  191. <<"resume_interval">> => <<"15s">>,
  192. <<"worker_pool_size">> => <<"1">>
  193. }
  194. },
  195. InnerConfigMap = serde_roundtrip(InnerConfigMap0),
  196. parse_and_check_bridge_config(InnerConfigMap, Name).
  197. %% check it serializes correctly
  198. serde_roundtrip(InnerConfigMap0) ->
  199. IOList = hocon_pp:do(InnerConfigMap0, #{}),
  200. {ok, InnerConfigMap} = hocon:binary(IOList),
  201. InnerConfigMap.
  202. parse_and_check_bridge_config(InnerConfigMap, Name) ->
  203. emqx_bridge_v2_testlib:parse_and_check(?BRIDGE_TYPE_BIN, Name, InnerConfigMap).
  204. make_message() ->
  205. ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
  206. Payload = emqx_guid:to_hexstr(emqx_guid:gen()),
  207. #{
  208. clientid => ClientId,
  209. payload => Payload,
  210. timestamp => 1668602148000
  211. }.
  212. %%------------------------------------------------------------------------------
  213. %% Testcases
  214. %%------------------------------------------------------------------------------
  215. t_start_stop(Config) ->
  216. emqx_bridge_v2_testlib:t_start_stop(Config, postgres_stopped),
  217. ok.
  218. t_create_via_http(Config) ->
  219. emqx_bridge_v2_testlib:t_create_via_http(Config),
  220. ok.
  221. t_on_get_status(Config) ->
  222. emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}),
  223. ok.
  224. t_sync_query(Config) ->
  225. ok = emqx_bridge_v2_testlib:t_sync_query(
  226. Config,
  227. fun make_message/0,
  228. fun(Res) -> ?assertMatch({ok, _}, Res) end,
  229. postgres_bridge_connector_on_query_return
  230. ),
  231. ok.
  232. t_start_action_or_source_with_disabled_connector(Config) ->
  233. ok = emqx_bridge_v2_testlib:t_start_action_or_source_with_disabled_connector(Config),
  234. ok.
  235. t_disable_prepared_statements(matrix) ->
  236. [[postgres], [timescale], [matrix]];
  237. t_disable_prepared_statements(Config0) ->
  238. ConnectorConfig0 = ?config(connector_config, Config0),
  239. ConnectorConfig = maps:merge(ConnectorConfig0, #{<<"disable_prepared_statements">> => true}),
  240. Config = lists:keyreplace(connector_config, 1, Config0, {connector_config, ConnectorConfig}),
  241. ok = emqx_bridge_v2_testlib:t_sync_query(
  242. Config,
  243. fun make_message/0,
  244. fun(Res) -> ?assertMatch({ok, _}, Res) end,
  245. postgres_bridge_connector_on_query_return
  246. ),
  247. ok = emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}),
  248. emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
  249. ok = emqx_bridge_v2_testlib:t_create_via_http(Config),
  250. ok.