%%-------------------------------------------------------------------- %% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- -module(emqx_bridge_v2_pgsql_SUITE). -compile(nowarn_export_all). -compile(export_all). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -define(BRIDGE_TYPE, pgsql). -define(BRIDGE_TYPE_BIN, <<"pgsql">>). -define(CONNECTOR_TYPE, pgsql). -define(CONNECTOR_TYPE_BIN, <<"pgsql">>). -import(emqx_common_test_helpers, [on_exit/1]). -import(emqx_utils_conv, [bin/1]). %%------------------------------------------------------------------------------ %% CT boilerplate %%------------------------------------------------------------------------------ all() -> All0 = emqx_common_test_helpers:all(?MODULE), All = All0 -- matrix_cases(), Groups = lists:map(fun({G, _, _}) -> {group, G} end, groups()), Groups ++ All. matrix_cases() -> [ t_disable_prepared_statements ]. groups() -> emqx_common_test_helpers:matrix_to_groups(?MODULE, matrix_cases()). init_per_suite(Config) -> PostgresHost = os:getenv("PGSQL_TCP_HOST", "toxiproxy"), PostgresPort = list_to_integer(os:getenv("PGSQL_TCP_PORT", "5432")), case emqx_common_test_helpers:is_tcp_server_available(PostgresHost, PostgresPort) of true -> Apps = emqx_cth_suite:start( [ emqx, emqx_conf, emqx_connector, emqx_bridge, emqx_bridge_pgsql, emqx_rule_engine, emqx_management, emqx_mgmt_api_test_util:emqx_dashboard() ], #{work_dir => emqx_cth_suite:work_dir(Config)} ), NConfig = [ {apps, Apps}, {pgsql_host, PostgresHost}, {pgsql_port, PostgresPort}, {enable_tls, false}, {postgres_host, PostgresHost}, {postgres_port, PostgresPort} | Config ], emqx_bridge_pgsql_SUITE:connect_and_create_table(NConfig), NConfig; false -> case os:getenv("IS_CI") of "yes" -> throw(no_postgres); _ -> {skip, no_postgres} end end. end_per_suite(Config) -> Apps = ?config(apps, Config), emqx_cth_suite:stop(Apps), ok. init_per_group(Group, Config) when Group =:= postgres; Group =:= timescale; Group =:= matrix -> [ {bridge_type, group_to_type(Group)}, {connector_type, group_to_type(Group)} | Config ]; init_per_group(batch_enabled, Config) -> [ {batch_size, 10}, {batch_time, <<"10ms">>} | Config ]; init_per_group(batch_disabled, Config) -> [ {batch_size, 1}, {batch_time, <<"0ms">>} | Config ]; init_per_group(_Group, Config) -> Config. group_to_type(postgres) -> pgsql; group_to_type(Group) -> Group. end_per_group(_Group, _Config) -> ok. init_per_testcase(TestCase, Config) -> ct:timetrap(timer:seconds(60)), emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(), emqx_config:delete_override_conf_files(), UniqueNum = integer_to_binary(erlang:unique_integer()), Name = iolist_to_binary([atom_to_binary(TestCase), UniqueNum]), Username = <<"root">>, Password = <<"public">>, Passfile = filename:join(?config(priv_dir, Config), "passfile"), ok = file:write_file(Passfile, Password), NConfig = [ {postgres_username, Username}, {postgres_password, Password}, {postgres_passfile, Passfile} | Config ], ConnectorConfig = connector_config(Name, NConfig), BridgeConfig = bridge_config(Name, Name), ok = snabbkaffe:start_trace(), [ {connector_type, proplists:get_value(connector_type, Config, ?CONNECTOR_TYPE)}, {connector_name, Name}, {connector_config, ConnectorConfig}, {bridge_type, proplists:get_value(bridge_type, Config, ?BRIDGE_TYPE)}, {bridge_name, Name}, {bridge_config, BridgeConfig} | NConfig ]. end_per_testcase(_Testcase, Config) -> case proplists:get_bool(skip_does_not_apply, Config) of true -> ok; false -> emqx_bridge_pgsql_SUITE:connect_and_clear_table(Config), emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(), emqx_common_test_helpers:call_janitor(60_000), ok = snabbkaffe:stop(), ok end. %%------------------------------------------------------------------------------ %% Helper fns %%------------------------------------------------------------------------------ connector_config(Name, Config) -> PostgresHost = ?config(postgres_host, Config), PostgresPort = ?config(postgres_port, Config), Username = ?config(postgres_username, Config), PassFile = ?config(postgres_passfile, Config), InnerConfigMap0 = #{ <<"enable">> => true, <<"database">> => <<"mqtt">>, <<"server">> => iolist_to_binary([PostgresHost, ":", integer_to_binary(PostgresPort)]), <<"pool_size">> => 8, <<"username">> => Username, <<"password">> => iolist_to_binary(["file://", PassFile]), <<"resource_opts">> => #{ <<"health_check_interval">> => <<"15s">>, <<"start_after_created">> => true, <<"start_timeout">> => <<"5s">> } }, InnerConfigMap = serde_roundtrip(InnerConfigMap0), emqx_bridge_v2_testlib:parse_and_check_connector(?CONNECTOR_TYPE_BIN, Name, InnerConfigMap). bridge_config(Name, ConnectorId) -> InnerConfigMap0 = #{ <<"enable">> => true, <<"connector">> => ConnectorId, <<"parameters">> => #{<<"sql">> => emqx_bridge_pgsql_SUITE:default_sql()}, <<"local_topic">> => <<"t/postgres">>, <<"resource_opts">> => #{ <<"batch_size">> => 1, <<"batch_time">> => <<"0ms">>, <<"buffer_mode">> => <<"memory_only">>, <<"buffer_seg_bytes">> => <<"10MB">>, <<"health_check_interval">> => <<"15s">>, <<"inflight_window">> => 100, <<"max_buffer_bytes">> => <<"256MB">>, <<"metrics_flush_interval">> => <<"1s">>, <<"query_mode">> => <<"sync">>, <<"request_ttl">> => <<"45s">>, <<"resume_interval">> => <<"15s">>, <<"worker_pool_size">> => <<"1">> } }, InnerConfigMap = serde_roundtrip(InnerConfigMap0), parse_and_check_bridge_config(InnerConfigMap, Name). %% check it serializes correctly serde_roundtrip(InnerConfigMap0) -> IOList = hocon_pp:do(InnerConfigMap0, #{}), {ok, InnerConfigMap} = hocon:binary(IOList), InnerConfigMap. parse_and_check_bridge_config(InnerConfigMap, Name) -> emqx_bridge_v2_testlib:parse_and_check(?BRIDGE_TYPE_BIN, Name, InnerConfigMap). make_message() -> ClientId = emqx_guid:to_hexstr(emqx_guid:gen()), Payload = emqx_guid:to_hexstr(emqx_guid:gen()), #{ clientid => ClientId, payload => Payload, timestamp => 1668602148000 }. %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ t_start_stop(Config) -> emqx_bridge_v2_testlib:t_start_stop(Config, postgres_stopped), ok. t_create_via_http(Config) -> emqx_bridge_v2_testlib:t_create_via_http(Config), ok. t_on_get_status(Config) -> emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}), ok. t_sync_query(Config) -> ok = emqx_bridge_v2_testlib:t_sync_query( Config, fun make_message/0, fun(Res) -> ?assertMatch({ok, _}, Res) end, postgres_bridge_connector_on_query_return ), ok. t_start_action_or_source_with_disabled_connector(Config) -> ok = emqx_bridge_v2_testlib:t_start_action_or_source_with_disabled_connector(Config), ok. t_disable_prepared_statements(matrix) -> [ [postgres, batch_disabled], [postgres, batch_enabled], [timescale, batch_disabled], [timescale, batch_enabled], [matrix, batch_disabled], [matrix, batch_enabled] ]; t_disable_prepared_statements(Config0) -> BatchSize = ?config(batch_size, Config0), BatchTime = ?config(batch_time, Config0), ConnectorConfig0 = ?config(connector_config, Config0), ConnectorConfig = maps:merge(ConnectorConfig0, #{<<"disable_prepared_statements">> => true}), BridgeConfig0 = ?config(bridge_config, Config0), BridgeConfig = emqx_utils_maps:deep_merge( BridgeConfig0, #{ <<"resource_opts">> => #{ <<"batch_size">> => BatchSize, <<"batch_time">> => BatchTime, <<"query_mode">> => <<"async">> } } ), Config1 = lists:keyreplace(connector_config, 1, Config0, {connector_config, ConnectorConfig}), Config = lists:keyreplace(bridge_config, 1, Config1, {bridge_config, BridgeConfig}), ?check_trace( #{timetrap => 5_000}, begin ?assertMatch({ok, _}, emqx_bridge_v2_testlib:create_bridge_api(Config)), RuleTopic = <<"t/postgres">>, Type = ?config(bridge_type, Config), {ok, _} = emqx_bridge_v2_testlib:create_rule_and_action_http(Type, RuleTopic, Config), ResourceId = emqx_bridge_v2_testlib:resource_id(Config), ?retry( _Sleep = 1_000, _Attempts = 20, ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) ), {ok, C} = emqtt:start_link(), {ok, _} = emqtt:connect(C), lists:foreach( fun(N) -> emqtt:publish(C, RuleTopic, integer_to_binary(N)) end, lists:seq(1, BatchSize) ), case BatchSize > 1 of true -> ?block_until(#{ ?snk_kind := "postgres_success_batch_result", row_count := BatchSize }), ok; false -> ok end, ok end, [] ), emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(), ok = emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}), emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(), ok = emqx_bridge_v2_testlib:t_create_via_http(Config), ok. t_update_with_invalid_prepare(Config) -> ConnectorName = ?config(connector_name, Config), BridgeName = ?config(bridge_name, Config), {ok, _} = emqx_bridge_v2_testlib:create_bridge_api(Config), %% arrivedx is a bad column name BadSQL = << "INSERT INTO mqtt_test(payload, arrivedx) " "VALUES (${payload}, TO_TIMESTAMP((${timestamp} :: bigint)/1000))" >>, Override = #{<<"parameters">> => #{<<"sql">> => BadSQL}}, {ok, {{_, 200, "OK"}, _Headers1, Body1}} = emqx_bridge_v2_testlib:update_bridge_api(Config, Override), ?assertMatch(#{<<"status">> := <<"disconnected">>}, Body1), Error1 = maps:get(<<"error">>, Body1), case re:run(Error1, <<"undefined_column">>, [{capture, none}]) of match -> ok; nomatch -> ct:fail(#{ expected_pattern => "undefined_column", got => Error1 }) end, %% assert that although there was an error returned, the invliad SQL is actually put C1 = [{action_name, BridgeName}, {action_type, pgsql} | Config], {ok, {{_, 200, "OK"}, _, Action}} = emqx_bridge_v2_testlib:get_action_api(C1), #{<<"parameters">> := #{<<"sql">> := FetchedSQL}} = Action, ?assertEqual(FetchedSQL, BadSQL), %% update again with the original sql {ok, {{_, 200, "OK"}, _Headers2, Body2}} = emqx_bridge_v2_testlib:update_bridge_api(Config, #{}), %% the error should be gone now, and status should be 'connected' ?assertMatch(#{<<"error">> := <<>>, <<"status">> := <<"connected">>}, Body2), %% finally check if ecpool worker should have exactly one of reconnect callback ConnectorResId = <<"connector:pgsql:", ConnectorName/binary>>, Workers = ecpool:workers(ConnectorResId), [_ | _] = WorkerPids = lists:map(fun({_, Pid}) -> Pid end, Workers), lists:foreach( fun(Pid) -> [{emqx_postgresql, prepare_sql_to_conn, Args}] = ecpool_worker:get_reconnect_callbacks(Pid), Sig = emqx_postgresql:get_reconnect_callback_signature(Args), BridgeResId = <<"action:pgsql:", BridgeName/binary, $:, ConnectorResId/binary>>, ?assertEqual(BridgeResId, Sig) end, WorkerPids ), ok.