emqx_bridge_v2_pgsql_SUITE.erl 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %% http://www.apache.org/licenses/LICENSE-2.0
  8. %%
  9. %% Unless required by applicable law or agreed to in writing, software
  10. %% distributed under the License is distributed on an "AS IS" BASIS,
  11. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. %% See the License for the specific language governing permissions and
  13. %% limitations under the License.
  14. %%--------------------------------------------------------------------
  15. -module(emqx_bridge_v2_pgsql_SUITE).
  16. -compile(nowarn_export_all).
  17. -compile(export_all).
  18. -include_lib("eunit/include/eunit.hrl").
  19. -include_lib("common_test/include/ct.hrl").
  20. -define(BRIDGE_TYPE, pgsql).
  21. -define(BRIDGE_TYPE_BIN, <<"pgsql">>).
  22. -define(CONNECTOR_TYPE, pgsql).
  23. -define(CONNECTOR_TYPE_BIN, <<"pgsql">>).
  24. -import(emqx_common_test_helpers, [on_exit/1]).
  25. -import(emqx_utils_conv, [bin/1]).
  26. %%------------------------------------------------------------------------------
  27. %% CT boilerplate
  28. %%------------------------------------------------------------------------------
  29. all() ->
  30. emqx_common_test_helpers:all(?MODULE).
  31. init_per_suite(Config) ->
  32. PostgresHost = os:getenv("PGSQL_TCP_HOST", "toxiproxy"),
  33. PostgresPort = list_to_integer(os:getenv("PGSQL_TCP_PORT", "5432")),
  34. case emqx_common_test_helpers:is_tcp_server_available(PostgresHost, PostgresPort) of
  35. true ->
  36. Apps = emqx_cth_suite:start(
  37. [
  38. emqx,
  39. emqx_conf,
  40. emqx_connector,
  41. emqx_bridge,
  42. emqx_bridge_pgsql,
  43. emqx_rule_engine,
  44. emqx_management,
  45. {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
  46. ],
  47. #{work_dir => emqx_cth_suite:work_dir(Config)}
  48. ),
  49. {ok, Api} = emqx_common_test_http:create_default_app(),
  50. NConfig = [
  51. {apps, Apps},
  52. {api, Api},
  53. {pgsql_host, PostgresHost},
  54. {pgsql_port, PostgresPort},
  55. {enable_tls, false},
  56. {postgres_host, PostgresHost},
  57. {postgres_port, PostgresPort}
  58. | Config
  59. ],
  60. emqx_bridge_pgsql_SUITE:connect_and_create_table(NConfig),
  61. NConfig;
  62. false ->
  63. case os:getenv("IS_CI") of
  64. "yes" ->
  65. throw(no_postgres);
  66. _ ->
  67. {skip, no_postgres}
  68. end
  69. end.
  70. end_per_suite(Config) ->
  71. Apps = ?config(apps, Config),
  72. emqx_cth_suite:stop(Apps),
  73. ok.
  74. init_per_testcase(TestCase, Config) ->
  75. common_init_per_testcase(TestCase, Config).
  76. common_init_per_testcase(TestCase, Config) ->
  77. ct:timetrap(timer:seconds(60)),
  78. emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
  79. emqx_config:delete_override_conf_files(),
  80. UniqueNum = integer_to_binary(erlang:unique_integer()),
  81. Name = iolist_to_binary([atom_to_binary(TestCase), UniqueNum]),
  82. Username = <<"root">>,
  83. Password = <<"public">>,
  84. Passfile = filename:join(?config(priv_dir, Config), "passfile"),
  85. ok = file:write_file(Passfile, Password),
  86. NConfig = [
  87. {postgres_username, Username},
  88. {postgres_password, Password},
  89. {postgres_passfile, Passfile}
  90. | Config
  91. ],
  92. ConnectorConfig = connector_config(Name, NConfig),
  93. BridgeConfig = bridge_config(Name, Name),
  94. ok = snabbkaffe:start_trace(),
  95. [
  96. {connector_type, ?CONNECTOR_TYPE},
  97. {connector_name, Name},
  98. {connector_config, ConnectorConfig},
  99. {bridge_type, ?BRIDGE_TYPE},
  100. {bridge_name, Name},
  101. {bridge_config, BridgeConfig}
  102. | NConfig
  103. ].
  104. end_per_testcase(_Testcase, Config) ->
  105. case proplists:get_bool(skip_does_not_apply, Config) of
  106. true ->
  107. ok;
  108. false ->
  109. emqx_bridge_pgsql_SUITE:connect_and_clear_table(Config),
  110. emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
  111. emqx_common_test_helpers:call_janitor(60_000),
  112. ok = snabbkaffe:stop(),
  113. ok
  114. end.
  115. %%------------------------------------------------------------------------------
  116. %% Helper fns
  117. %%------------------------------------------------------------------------------
  118. connector_config(Name, Config) ->
  119. PostgresHost = ?config(postgres_host, Config),
  120. PostgresPort = ?config(postgres_port, Config),
  121. Username = ?config(postgres_username, Config),
  122. PassFile = ?config(postgres_passfile, Config),
  123. InnerConfigMap0 =
  124. #{
  125. <<"enable">> => true,
  126. <<"database">> => <<"mqtt">>,
  127. <<"server">> => iolist_to_binary([PostgresHost, ":", integer_to_binary(PostgresPort)]),
  128. <<"pool_size">> => 8,
  129. <<"username">> => Username,
  130. <<"password">> => iolist_to_binary(["file://", PassFile]),
  131. <<"resource_opts">> => #{
  132. <<"health_check_interval">> => <<"15s">>,
  133. <<"start_after_created">> => true,
  134. <<"start_timeout">> => <<"5s">>
  135. }
  136. },
  137. InnerConfigMap = serde_roundtrip(InnerConfigMap0),
  138. parse_and_check_connector_config(InnerConfigMap, Name).
  139. parse_and_check_connector_config(InnerConfigMap, Name) ->
  140. TypeBin = ?CONNECTOR_TYPE_BIN,
  141. RawConf = #{<<"connectors">> => #{TypeBin => #{Name => InnerConfigMap}}},
  142. #{<<"connectors">> := #{TypeBin := #{Name := Config}}} =
  143. hocon_tconf:check_plain(emqx_connector_schema, RawConf, #{
  144. required => false, atom_key => false
  145. }),
  146. ct:pal("parsed config: ~p", [Config]),
  147. InnerConfigMap.
  148. bridge_config(Name, ConnectorId) ->
  149. InnerConfigMap0 =
  150. #{
  151. <<"enable">> => true,
  152. <<"connector">> => ConnectorId,
  153. <<"parameters">> =>
  154. #{<<"sql">> => emqx_bridge_pgsql_SUITE:default_sql()},
  155. <<"local_topic">> => <<"t/postgres">>,
  156. <<"resource_opts">> => #{
  157. <<"batch_size">> => 1,
  158. <<"batch_time">> => <<"0ms">>,
  159. <<"buffer_mode">> => <<"memory_only">>,
  160. <<"buffer_seg_bytes">> => <<"10MB">>,
  161. <<"health_check_interval">> => <<"15s">>,
  162. <<"inflight_window">> => 100,
  163. <<"max_buffer_bytes">> => <<"256MB">>,
  164. <<"metrics_flush_interval">> => <<"1s">>,
  165. <<"query_mode">> => <<"sync">>,
  166. <<"request_ttl">> => <<"45s">>,
  167. <<"resume_interval">> => <<"15s">>,
  168. <<"worker_pool_size">> => <<"1">>
  169. }
  170. },
  171. InnerConfigMap = serde_roundtrip(InnerConfigMap0),
  172. parse_and_check_bridge_config(InnerConfigMap, Name).
  173. %% check it serializes correctly
  174. serde_roundtrip(InnerConfigMap0) ->
  175. IOList = hocon_pp:do(InnerConfigMap0, #{}),
  176. {ok, InnerConfigMap} = hocon:binary(IOList),
  177. InnerConfigMap.
  178. parse_and_check_bridge_config(InnerConfigMap, Name) ->
  179. emqx_bridge_v2_testlib:parse_and_check(?BRIDGE_TYPE_BIN, Name, InnerConfigMap).
  180. make_message() ->
  181. ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
  182. Payload = emqx_guid:to_hexstr(emqx_guid:gen()),
  183. #{
  184. clientid => ClientId,
  185. payload => Payload,
  186. timestamp => 1668602148000
  187. }.
  188. %%------------------------------------------------------------------------------
  189. %% Testcases
  190. %%------------------------------------------------------------------------------
  191. t_start_stop(Config) ->
  192. emqx_bridge_v2_testlib:t_start_stop(Config, postgres_stopped),
  193. ok.
  194. t_create_via_http(Config) ->
  195. emqx_bridge_v2_testlib:t_create_via_http(Config),
  196. ok.
  197. t_on_get_status(Config) ->
  198. emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}),
  199. ok.
  200. t_sync_query(Config) ->
  201. ok = emqx_bridge_v2_testlib:t_sync_query(
  202. Config,
  203. fun make_message/0,
  204. fun(Res) -> ?assertMatch({ok, _}, Res) end,
  205. postgres_bridge_connector_on_query_return
  206. ),
  207. ok.