| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735 |
- %%--------------------------------------------------------------------
- %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
- %%--------------------------------------------------------------------
- -module(emqx_bridge_pgsql_SUITE).
- -compile(nowarn_export_all).
- -compile(export_all).
- -include_lib("eunit/include/eunit.hrl").
- -include_lib("common_test/include/ct.hrl").
- -include_lib("snabbkaffe/include/snabbkaffe.hrl").
- -include("emqx_resource_errors.hrl").
- % SQL definitions
- -define(SQL_BRIDGE,
- "INSERT INTO mqtt_test(payload, arrived) "
- "VALUES (${payload}, TO_TIMESTAMP((${timestamp} :: bigint)/1000))"
- ).
- -define(SQL_CREATE_TABLE,
- "CREATE TABLE IF NOT EXISTS mqtt_test (payload text, arrived timestamp NOT NULL) "
- ).
- -define(SQL_DROP_TABLE, "DROP TABLE mqtt_test").
- -define(SQL_DELETE, "DELETE from mqtt_test").
- -define(SQL_SELECT, "SELECT payload FROM mqtt_test").
- % DB defaults
- -define(PGSQL_DATABASE, "mqtt").
- -define(PGSQL_USERNAME, "root").
- -define(PGSQL_PASSWORD, "public").
- -define(BATCH_SIZE, 10).
- %%------------------------------------------------------------------------------
- %% CT boilerplate
- %%------------------------------------------------------------------------------
- all() ->
- [
- {group, tcp},
- {group, tls}
- ].
- groups() ->
- TCs = emqx_common_test_helpers:all(?MODULE),
- NonBatchCases = [t_write_timeout],
- BatchVariantGroups = [
- {group, with_batch},
- {group, without_batch},
- {group, matrix},
- {group, timescale}
- ],
- QueryModeGroups = [{async, BatchVariantGroups}, {sync, BatchVariantGroups}],
- [
- {tcp, QueryModeGroups},
- {tls, QueryModeGroups},
- {async, BatchVariantGroups},
- {sync, BatchVariantGroups},
- {with_batch, TCs -- NonBatchCases},
- {without_batch, TCs},
- {matrix, [t_setup_via_config_and_publish, t_setup_via_http_api_and_publish]},
- {timescale, [t_setup_via_config_and_publish, t_setup_via_http_api_and_publish]}
- ].
- init_per_group(tcp, Config) ->
- Host = os:getenv("PGSQL_TCP_HOST", "toxiproxy"),
- Port = list_to_integer(os:getenv("PGSQL_TCP_PORT", "5432")),
- [
- {pgsql_host, Host},
- {pgsql_port, Port},
- {enable_tls, false},
- {proxy_name, "pgsql_tcp"}
- | Config
- ];
- init_per_group(tls, Config) ->
- Host = os:getenv("PGSQL_TLS_HOST", "toxiproxy"),
- Port = list_to_integer(os:getenv("PGSQL_TLS_PORT", "5433")),
- [
- {pgsql_host, Host},
- {pgsql_port, Port},
- {enable_tls, true},
- {proxy_name, "pgsql_tls"}
- | Config
- ];
- init_per_group(async, Config) ->
- [{query_mode, async} | Config];
- init_per_group(sync, Config) ->
- [{query_mode, sync} | Config];
- init_per_group(with_batch, Config0) ->
- Config = [{enable_batch, true} | Config0],
- common_init(Config);
- init_per_group(without_batch, Config0) ->
- Config = [{enable_batch, false} | Config0],
- common_init(Config);
- init_per_group(matrix, Config0) ->
- Config = [{bridge_type, <<"matrix">>}, {enable_batch, true} | Config0],
- common_init(Config);
- init_per_group(timescale, Config0) ->
- Config = [{bridge_type, <<"timescale">>}, {enable_batch, true} | Config0],
- common_init(Config);
- init_per_group(_Group, Config) ->
- Config.
- end_per_group(Group, Config) when Group =:= with_batch; Group =:= without_batch ->
- connect_and_drop_table(Config),
- ProxyHost = ?config(proxy_host, Config),
- ProxyPort = ?config(proxy_port, Config),
- emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
- ok;
- end_per_group(_Group, _Config) ->
- ok.
- init_per_suite(Config) ->
- Config.
- end_per_suite(_Config) ->
- emqx_mgmt_api_test_util:end_suite(),
- ok = emqx_common_test_helpers:stop_apps([emqx_bridge, emqx_conf]),
- ok.
- init_per_testcase(_Testcase, Config) ->
- connect_and_clear_table(Config),
- delete_bridge(Config),
- snabbkaffe:start_trace(),
- Config.
- end_per_testcase(_Testcase, Config) ->
- ProxyHost = ?config(proxy_host, Config),
- ProxyPort = ?config(proxy_port, Config),
- emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
- connect_and_clear_table(Config),
- ok = snabbkaffe:stop(),
- delete_bridge(Config),
- ok.
- %%------------------------------------------------------------------------------
- %% Helper fns
- %%------------------------------------------------------------------------------
- common_init(Config0) ->
- BridgeType = proplists:get_value(bridge_type, Config0, <<"pgsql">>),
- Host = ?config(pgsql_host, Config0),
- Port = ?config(pgsql_port, Config0),
- case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of
- true ->
- % Setup toxiproxy
- ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
- ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
- emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
- % Ensure enterprise bridge module is loaded
- ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]),
- _ = emqx_bridge_enterprise:module_info(),
- emqx_mgmt_api_test_util:init_suite(),
- % Connect to pgsql directly and create the table
- connect_and_create_table(Config0),
- {Name, PGConf} = pgsql_config(BridgeType, Config0),
- Config =
- [
- {pgsql_config, PGConf},
- {pgsql_bridge_type, BridgeType},
- {pgsql_name, Name},
- {proxy_host, ProxyHost},
- {proxy_port, ProxyPort}
- | Config0
- ],
- Config;
- false ->
- case os:getenv("IS_CI") of
- "yes" ->
- throw(no_pgsql);
- _ ->
- {skip, no_pgsql}
- end
- end.
- pgsql_config(BridgeType, Config) ->
- Port = integer_to_list(?config(pgsql_port, Config)),
- Server = ?config(pgsql_host, Config) ++ ":" ++ Port,
- Name = atom_to_binary(?MODULE),
- BatchSize =
- case ?config(enable_batch, Config) of
- true -> ?BATCH_SIZE;
- false -> 1
- end,
- QueryMode = ?config(query_mode, Config),
- TlsEnabled = ?config(enable_tls, Config),
- ConfigString =
- io_lib:format(
- "bridges.~s.~s {\n"
- " enable = true\n"
- " server = ~p\n"
- " database = ~p\n"
- " username = ~p\n"
- " password = ~p\n"
- " sql = ~p\n"
- " resource_opts = {\n"
- " request_ttl = 500ms\n"
- " batch_size = ~b\n"
- " query_mode = ~s\n"
- " }\n"
- " ssl = {\n"
- " enable = ~w\n"
- " }\n"
- "}",
- [
- BridgeType,
- Name,
- Server,
- ?PGSQL_DATABASE,
- ?PGSQL_USERNAME,
- ?PGSQL_PASSWORD,
- ?SQL_BRIDGE,
- BatchSize,
- QueryMode,
- TlsEnabled
- ]
- ),
- {Name, parse_and_check(ConfigString, BridgeType, Name)}.
- parse_and_check(ConfigString, BridgeType, Name) ->
- {ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
- hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
- #{<<"bridges">> := #{BridgeType := #{Name := Config}}} = RawConf,
- Config.
- create_bridge(Config) ->
- create_bridge(Config, _Overrides = #{}).
- create_bridge(Config, Overrides) ->
- BridgeType = ?config(pgsql_bridge_type, Config),
- Name = ?config(pgsql_name, Config),
- PGConfig0 = ?config(pgsql_config, Config),
- PGConfig = emqx_utils_maps:deep_merge(PGConfig0, Overrides),
- emqx_bridge:create(BridgeType, Name, PGConfig).
- delete_bridge(Config) ->
- BridgeType = ?config(pgsql_bridge_type, Config),
- Name = ?config(pgsql_name, Config),
- emqx_bridge:remove(BridgeType, Name).
- create_bridge_http(Params) ->
- Path = emqx_mgmt_api_test_util:api_path(["bridges"]),
- AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
- case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of
- {ok, Res} -> {ok, emqx_utils_json:decode(Res, [return_maps])};
- Error -> Error
- end.
- send_message(Config, Payload) ->
- Name = ?config(pgsql_name, Config),
- BridgeType = ?config(pgsql_bridge_type, Config),
- BridgeID = emqx_bridge_resource:bridge_id(BridgeType, Name),
- emqx_bridge:send_message(BridgeID, Payload).
- query_resource(Config, Request) ->
- Name = ?config(pgsql_name, Config),
- BridgeType = ?config(pgsql_bridge_type, Config),
- ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
- emqx_resource:query(ResourceID, Request, #{timeout => 1_000}).
- query_resource_sync(Config, Request) ->
- Name = ?config(pgsql_name, Config),
- BridgeType = ?config(pgsql_bridge_type, Config),
- ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
- emqx_resource_buffer_worker:simple_sync_query(ResourceID, Request).
- query_resource_async(Config, Request) ->
- query_resource_async(Config, Request, _Opts = #{}).
- query_resource_async(Config, Request, Opts) ->
- Name = ?config(pgsql_name, Config),
- BridgeType = ?config(pgsql_bridge_type, Config),
- Ref = alias([reply]),
- AsyncReplyFun = fun(Result) -> Ref ! {result, Ref, Result} end,
- ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
- Timeout = maps:get(timeout, Opts, 500),
- Return = emqx_resource:query(ResourceID, Request, #{
- timeout => Timeout,
- async_reply_fun => {AsyncReplyFun, []}
- }),
- {Return, Ref}.
- receive_result(Ref, Timeout) ->
- receive
- {result, Ref, Result} ->
- {ok, Result};
- {Ref, Result} ->
- {ok, Result}
- after Timeout ->
- timeout
- end.
- connect_direct_pgsql(Config) ->
- Opts = #{
- host => ?config(pgsql_host, Config),
- port => ?config(pgsql_port, Config),
- username => ?PGSQL_USERNAME,
- password => ?PGSQL_PASSWORD,
- database => ?PGSQL_DATABASE
- },
- SslOpts =
- case ?config(enable_tls, Config) of
- true ->
- Opts#{
- ssl => true,
- ssl_opts => emqx_tls_lib:to_client_opts(#{enable => true})
- };
- false ->
- Opts
- end,
- {ok, Con} = epgsql:connect(SslOpts),
- Con.
- % These funs connect and then stop the pgsql connection
- connect_and_create_table(Config) ->
- Con = connect_direct_pgsql(Config),
- {ok, _, _} = epgsql:squery(Con, ?SQL_CREATE_TABLE),
- ok = epgsql:close(Con).
- connect_and_drop_table(Config) ->
- Con = connect_direct_pgsql(Config),
- {ok, _, _} = epgsql:squery(Con, ?SQL_DROP_TABLE),
- ok = epgsql:close(Con).
- connect_and_clear_table(Config) ->
- Con = connect_direct_pgsql(Config),
- {ok, _} = epgsql:squery(Con, ?SQL_DELETE),
- ok = epgsql:close(Con).
- connect_and_get_payload(Config) ->
- Con = connect_direct_pgsql(Config),
- {ok, _, [{Result}]} = epgsql:squery(Con, ?SQL_SELECT),
- ok = epgsql:close(Con),
- Result.
- %%------------------------------------------------------------------------------
- %% Testcases
- %%------------------------------------------------------------------------------
- t_setup_via_config_and_publish(Config) ->
- ?assertMatch(
- {ok, _},
- create_bridge(Config)
- ),
- Val = integer_to_binary(erlang:unique_integer()),
- SentData = #{payload => Val, timestamp => 1668602148000},
- ?check_trace(
- begin
- {_, {ok, _}} =
- ?wait_async_action(
- send_message(Config, SentData),
- #{?snk_kind := pgsql_connector_query_return},
- 10_000
- ),
- ?assertMatch(
- Val,
- connect_and_get_payload(Config)
- ),
- ok
- end,
- fun(Trace0) ->
- Trace = ?of_kind(pgsql_connector_query_return, Trace0),
- case ?config(enable_batch, Config) of
- true ->
- ?assertMatch([#{result := {_, [{ok, 1}]}}], Trace);
- false ->
- ?assertMatch([#{result := {ok, 1}}], Trace)
- end,
- ok
- end
- ),
- ok.
- t_setup_via_http_api_and_publish(Config) ->
- BridgeType = ?config(pgsql_bridge_type, Config),
- Name = ?config(pgsql_name, Config),
- PgsqlConfig0 = ?config(pgsql_config, Config),
- QueryMode = ?config(query_mode, Config),
- PgsqlConfig = PgsqlConfig0#{
- <<"name">> => Name,
- <<"type">> => BridgeType
- },
- ?assertMatch(
- {ok, _},
- create_bridge_http(PgsqlConfig)
- ),
- Val = integer_to_binary(erlang:unique_integer()),
- SentData = #{payload => Val, timestamp => 1668602148000},
- ?check_trace(
- begin
- {Res, {ok, _}} =
- ?wait_async_action(
- send_message(Config, SentData),
- #{?snk_kind := pgsql_connector_query_return},
- 10_000
- ),
- case QueryMode of
- async ->
- ok;
- sync ->
- ?assertEqual({ok, 1}, Res)
- end,
- ?assertMatch(
- Val,
- connect_and_get_payload(Config)
- ),
- ok
- end,
- fun(Trace0) ->
- Trace = ?of_kind(pgsql_connector_query_return, Trace0),
- case ?config(enable_batch, Config) of
- true ->
- ?assertMatch([#{result := {_, [{ok, 1}]}}], Trace);
- false ->
- ?assertMatch([#{result := {ok, 1}}], Trace)
- end,
- ok
- end
- ),
- ok.
- t_get_status(Config) ->
- ?assertMatch(
- {ok, _},
- create_bridge(Config)
- ),
- ProxyPort = ?config(proxy_port, Config),
- ProxyHost = ?config(proxy_host, Config),
- ProxyName = ?config(proxy_name, Config),
- Name = ?config(pgsql_name, Config),
- BridgeType = ?config(pgsql_bridge_type, Config),
- ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
- ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID)),
- emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
- ?assertMatch(
- {ok, Status} when Status =:= disconnected orelse Status =:= connecting,
- emqx_resource_manager:health_check(ResourceID)
- )
- end),
- ok.
- t_create_disconnected(Config) ->
- ProxyPort = ?config(proxy_port, Config),
- ProxyHost = ?config(proxy_host, Config),
- ProxyName = ?config(proxy_name, Config),
- ?check_trace(
- emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
- ?assertMatch({ok, _}, create_bridge(Config))
- end),
- fun(Trace) ->
- ?assertMatch(
- [#{error := {start_pool_failed, _, _}}],
- ?of_kind(pgsql_connector_start_failed, Trace)
- ),
- ok
- end
- ),
- ok.
- t_write_failure(Config) ->
- ProxyName = ?config(proxy_name, Config),
- ProxyPort = ?config(proxy_port, Config),
- ProxyHost = ?config(proxy_host, Config),
- QueryMode = ?config(query_mode, Config),
- {ok, _} = create_bridge(Config),
- Val = integer_to_binary(erlang:unique_integer()),
- SentData = #{payload => Val, timestamp => 1668602148000},
- ?check_trace(
- emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
- {_, {ok, _}} =
- ?wait_async_action(
- case QueryMode of
- sync ->
- ?assertMatch({error, _}, send_message(Config, SentData));
- async ->
- send_message(Config, SentData)
- end,
- #{?snk_kind := buffer_worker_flush_nack},
- 15_000
- )
- end),
- fun(Trace0) ->
- ct:pal("trace: ~p", [Trace0]),
- Trace = ?of_kind(buffer_worker_flush_nack, Trace0),
- ?assertMatch([#{result := {error, _}} | _], Trace),
- [#{result := {error, Error}} | _] = Trace,
- case Error of
- {resource_error, _} ->
- ok;
- {recoverable_error, disconnected} ->
- ok;
- _ ->
- ct:fail("unexpected error: ~p", [Error])
- end
- end
- ),
- ok.
- % This test doesn't work with batch enabled since it is not possible
- % to set the timeout directly for batch queries
- t_write_timeout(Config) ->
- ProxyName = ?config(proxy_name, Config),
- ProxyPort = ?config(proxy_port, Config),
- ProxyHost = ?config(proxy_host, Config),
- QueryMode = ?config(query_mode, Config),
- {ok, _} = create_bridge(
- Config,
- #{
- <<"resource_opts">> => #{
- <<"resume_interval">> => <<"100ms">>,
- <<"health_check_interval">> => <<"100ms">>
- }
- }
- ),
- Val = integer_to_binary(erlang:unique_integer()),
- SentData = #{payload => Val, timestamp => 1668602148000},
- {ok, SRef} = snabbkaffe:subscribe(
- ?match_event(#{?snk_kind := call_query_enter}),
- 2_000
- ),
- Res0 =
- emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() ->
- Res1 =
- case QueryMode of
- async ->
- query_resource_async(Config, {send_message, SentData}, #{timeout => 60_000});
- sync ->
- query_resource(Config, {send_message, SentData})
- end,
- ?assertMatch({ok, [_]}, snabbkaffe:receive_events(SRef)),
- Res1
- end),
- case Res0 of
- {_, Ref} when is_reference(Ref) ->
- case receive_result(Ref, 15_000) of
- {ok, Res} ->
- %% we may receive a successful result depending on
- %% timing, if the request is retried after the
- %% failure is healed.
- case Res of
- {error, {unrecoverable_error, _}} ->
- ok;
- {ok, _} ->
- ok;
- _ ->
- ct:fail("unexpected result: ~p", [Res])
- end;
- timeout ->
- ct:pal("mailbox:\n ~p", [process_info(self(), messages)]),
- ct:fail("no response received")
- end;
- _ ->
- ?assertMatch({error, {resource_error, #{reason := timeout}}}, Res0)
- end,
- ok.
- t_simple_sql_query(Config) ->
- EnableBatch = ?config(enable_batch, Config),
- QueryMode = ?config(query_mode, Config),
- ?assertMatch(
- {ok, _},
- create_bridge(Config)
- ),
- Request = {sql, <<"SELECT count(1) AS T">>},
- Result =
- case QueryMode of
- sync ->
- query_resource(Config, Request);
- async ->
- {_, Ref} = query_resource_async(Config, Request),
- {ok, Res} = receive_result(Ref, 2_000),
- Res
- end,
- case EnableBatch of
- true ->
- ?assertEqual({error, {unrecoverable_error, batch_prepare_not_implemented}}, Result);
- false ->
- ?assertMatch({ok, _, [{1}]}, Result)
- end,
- ok.
- t_missing_data(Config) ->
- ?assertMatch(
- {ok, _},
- create_bridge(Config)
- ),
- {_, {ok, Event}} =
- ?wait_async_action(
- send_message(Config, #{}),
- #{?snk_kind := buffer_worker_flush_ack},
- 2_000
- ),
- ?assertMatch(
- #{
- result :=
- {error,
- {unrecoverable_error, {error, error, <<"23502">>, not_null_violation, _, _}}}
- },
- Event
- ),
- ok.
- t_bad_sql_parameter(Config) ->
- QueryMode = ?config(query_mode, Config),
- EnableBatch = ?config(enable_batch, Config),
- ?assertMatch(
- {ok, _},
- create_bridge(Config)
- ),
- Request = {sql, <<"">>, [bad_parameter]},
- Result =
- case QueryMode of
- sync ->
- query_resource(Config, Request);
- async ->
- {_, Ref} = query_resource_async(Config, Request),
- {ok, Res} = receive_result(Ref, 2_000),
- Res
- end,
- case EnableBatch of
- true ->
- ?assertEqual({error, {unrecoverable_error, invalid_request}}, Result);
- false ->
- ?assertMatch(
- {error, {unrecoverable_error, _}}, Result
- )
- end,
- ok.
- t_nasty_sql_string(Config) ->
- ?assertMatch({ok, _}, create_bridge(Config)),
- Payload = list_to_binary(lists:seq(1, 127)),
- Message = #{payload => Payload, timestamp => erlang:system_time(millisecond)},
- {_, {ok, _}} =
- ?wait_async_action(
- send_message(Config, Message),
- #{?snk_kind := pgsql_connector_query_return},
- 1_000
- ),
- ?assertEqual(Payload, connect_and_get_payload(Config)).
- t_missing_table(Config) ->
- Name = ?config(pgsql_name, Config),
- BridgeType = ?config(pgsql_bridge_type, Config),
- ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
- ?check_trace(
- begin
- connect_and_drop_table(Config),
- ?assertMatch({ok, _}, create_bridge(Config)),
- ?retry(
- _Sleep = 1_000,
- _Attempts = 20,
- ?assertMatch(
- {ok, Status} when Status == connecting orelse Status == disconnected,
- emqx_resource_manager:health_check(ResourceID)
- )
- ),
- Val = integer_to_binary(erlang:unique_integer()),
- SentData = #{payload => Val, timestamp => 1668602148000},
- Timeout = 1000,
- ?assertMatch(
- {error, {resource_error, #{reason := unhealthy_target}}},
- query_resource(Config, {send_message, SentData, [], Timeout})
- ),
- ok
- end,
- fun(Trace) ->
- ?assertMatch([_, _, _], ?of_kind(pgsql_undefined_table, Trace)),
- ok
- end
- ),
- connect_and_create_table(Config),
- ok.
- t_table_removed(Config) ->
- Name = ?config(pgsql_name, Config),
- BridgeType = ?config(pgsql_bridge_type, Config),
- ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
- ?check_trace(
- begin
- connect_and_create_table(Config),
- ?assertMatch({ok, _}, create_bridge(Config)),
- ?retry(
- _Sleep = 1_000,
- _Attempts = 20,
- ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID))
- ),
- connect_and_drop_table(Config),
- Val = integer_to_binary(erlang:unique_integer()),
- SentData = #{payload => Val, timestamp => 1668602148000},
- case query_resource_sync(Config, {send_message, SentData, []}) of
- {error, {unrecoverable_error, {error, error, <<"42P01">>, undefined_table, _, _}}} ->
- ok;
- ?RESOURCE_ERROR_M(not_connected, _) ->
- ok;
- Res ->
- ct:fail("unexpected result: ~p", [Res])
- end,
- ok
- end,
- []
- ),
- connect_and_create_table(Config),
- ok.
- t_concurrent_health_checks(Config) ->
- Name = ?config(pgsql_name, Config),
- BridgeType = ?config(pgsql_bridge_type, Config),
- ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
- ?check_trace(
- begin
- connect_and_create_table(Config),
- ?assertMatch({ok, _}, create_bridge(Config)),
- ?retry(
- _Sleep = 1_000,
- _Attempts = 20,
- ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID))
- ),
- emqx_utils:pmap(
- fun(_) ->
- ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID))
- end,
- lists:seq(1, 20)
- ),
- ok
- end,
- fun(Trace) ->
- ?assertEqual([], ?of_kind(postgres_connector_bad_parse2, Trace)),
- ok
- end
- ),
- ok.
|