| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958 |
- %%--------------------------------------------------------------------
- %% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
- %%--------------------------------------------------------------------
- -module(emqx_bridge_v2_testlib).
- -compile(nowarn_export_all).
- -compile(export_all).
- -include_lib("eunit/include/eunit.hrl").
- -include_lib("common_test/include/ct.hrl").
- -include_lib("snabbkaffe/include/snabbkaffe.hrl").
- -include_lib("emqx_resource/include/emqx_resource.hrl").
- -import(emqx_common_test_helpers, [on_exit/1]).
- -define(ROOT_KEY_ACTIONS, actions).
- -define(ROOT_KEY_SOURCES, sources).
- %% ct setup helpers
- init_per_suite(Config, Apps) ->
- [{start_apps, Apps} | Config].
- end_per_suite(Config) ->
- delete_all_bridges_and_connectors(),
- emqx_mgmt_api_test_util:end_suite(),
- ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
- ok = emqx_connector_test_helpers:stop_apps(lists:reverse(?config(start_apps, Config))),
- _ = application:stop(emqx_connector),
- ok.
- init_per_group(TestGroup, BridgeType, Config) ->
- ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
- ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
- emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
- application:load(emqx_bridge),
- ok = emqx_common_test_helpers:start_apps([emqx_conf]),
- ok = emqx_connector_test_helpers:start_apps(?config(start_apps, Config)),
- {ok, _} = application:ensure_all_started(emqx_connector),
- emqx_mgmt_api_test_util:init_suite(),
- UniqueNum = integer_to_binary(erlang:unique_integer([positive])),
- MQTTTopic = <<"mqtt/topic/abc", UniqueNum/binary>>,
- [
- {proxy_host, ProxyHost},
- {proxy_port, ProxyPort},
- {mqtt_topic, MQTTTopic},
- {test_group, TestGroup},
- {bridge_type, BridgeType}
- | Config
- ].
- end_per_group(Config) ->
- ProxyHost = ?config(proxy_host, Config),
- ProxyPort = ?config(proxy_port, Config),
- emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
- % delete_all_bridges(),
- ok.
- init_per_testcase(TestCase, Config0, BridgeConfigCb) ->
- ct:timetrap(timer:seconds(60)),
- delete_all_bridges_and_connectors(),
- UniqueNum = integer_to_binary(erlang:unique_integer()),
- BridgeTopic =
- <<
- (atom_to_binary(TestCase))/binary,
- UniqueNum/binary
- >>,
- TestGroup = ?config(test_group, Config0),
- Config = [{bridge_topic, BridgeTopic} | Config0],
- {Name, ConfigString, BridgeConfig} = BridgeConfigCb(
- TestCase, TestGroup, Config
- ),
- ok = snabbkaffe:start_trace(),
- [
- {bridge_name, Name},
- {bridge_config_string, ConfigString},
- {bridge_config, BridgeConfig}
- | Config
- ].
- end_per_testcase(_Testcase, Config) ->
- case proplists:get_bool(skip_does_not_apply, Config) of
- true ->
- ok;
- false ->
- ProxyHost = ?config(proxy_host, Config),
- ProxyPort = ?config(proxy_port, Config),
- emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
- %% in CI, apparently this needs more time since the
- %% machines struggle with all the containers running...
- emqx_common_test_helpers:call_janitor(60_000),
- delete_all_bridges_and_connectors(),
- ok = snabbkaffe:stop(),
- ok
- end.
- delete_all_bridges_and_connectors() ->
- delete_all_bridges(),
- delete_all_connectors().
- delete_all_bridges() ->
- lists:foreach(
- fun(#{name := Name, type := Type}) ->
- emqx_bridge_v2:remove(actions, Type, Name)
- end,
- emqx_bridge_v2:list(actions)
- ),
- lists:foreach(
- fun(#{name := Name, type := Type}) ->
- emqx_bridge_v2:remove(sources, Type, Name)
- end,
- emqx_bridge_v2:list(sources)
- ).
- delete_all_connectors() ->
- lists:foreach(
- fun(#{name := Name, type := Type}) ->
- emqx_connector:remove(Type, Name)
- end,
- emqx_connector:list()
- ).
- %% test helpers
- parse_and_check(Type, Name, InnerConfigMap0) ->
- parse_and_check(action, Type, Name, InnerConfigMap0).
- parse_and_check(Kind, Type, Name, InnerConfigMap0) ->
- RootBin =
- case Kind of
- action -> <<"actions">>;
- source -> <<"sources">>
- end,
- TypeBin = emqx_utils_conv:bin(Type),
- RawConf = #{RootBin => #{TypeBin => #{Name => InnerConfigMap0}}},
- do_parse_and_check(RootBin, TypeBin, Name, emqx_bridge_v2_schema, RawConf).
- parse_and_check_connector(Type, Name, InnerConfigMap0) ->
- TypeBin = emqx_utils_conv:bin(Type),
- RawConf = #{<<"connectors">> => #{TypeBin => #{Name => InnerConfigMap0}}},
- do_parse_and_check(<<"connectors">>, TypeBin, Name, emqx_connector_schema, RawConf).
- do_parse_and_check(RootBin, TypeBin, NameBin, SchemaMod, RawConf) ->
- #{RootBin := #{TypeBin := #{NameBin := _}}} = hocon_tconf:check_plain(
- SchemaMod,
- RawConf,
- #{
- required => false,
- atom_key => false,
- %% to trigger validators that otherwise aren't triggered
- make_serializable => false
- }
- ),
- #{RootBin := #{TypeBin := #{NameBin := InnerConfigMap}}} = hocon_tconf:check_plain(
- SchemaMod,
- RawConf,
- #{
- required => false,
- atom_key => false,
- make_serializable => true
- }
- ),
- InnerConfigMap.
- bridge_id(Config) ->
- BridgeType = ?config(bridge_type, Config),
- BridgeName = ?config(bridge_name, Config),
- BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName),
- ConnectorId = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
- <<"action:", BridgeId/binary, ":", ConnectorId/binary>>.
- source_hookpoint(Config) ->
- #{kind := source, type := Type, name := Name} = get_common_values(Config),
- BridgeId = emqx_bridge_resource:bridge_id(Type, Name),
- emqx_bridge_v2:source_hookpoint(BridgeId).
- action_hookpoint(Config) ->
- #{kind := action, type := Type, name := Name} = get_common_values(Config),
- BridgeId = emqx_bridge_resource:bridge_id(Type, Name),
- emqx_bridge_resource:bridge_hookpoint(BridgeId).
- add_source_hookpoint(Config) ->
- Hookpoint = source_hookpoint(Config),
- ok = emqx_hooks:add(Hookpoint, {?MODULE, source_hookpoint_callback, [self()]}, 1000),
- on_exit(fun() -> emqx_hooks:del(Hookpoint, {?MODULE, source_hookpoint_callback}) end),
- ok.
- resource_id(Config) ->
- #{
- kind := Kind,
- type := Type,
- name := Name,
- connector_name := ConnectorName
- } = get_common_values(Config),
- case Kind of
- source ->
- emqx_bridge_v2:source_id(Type, Name, ConnectorName);
- action ->
- emqx_bridge_resource:resource_id(Type, Name)
- end.
- create_bridge(Config) ->
- create_bridge(Config, _Overrides = #{}).
- create_bridge(Config, Overrides) ->
- BridgeType = ?config(bridge_type, Config),
- BridgeName = ?config(bridge_name, Config),
- BridgeConfig0 = ?config(bridge_config, Config),
- BridgeConfig = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides),
- ConnectorName = ?config(connector_name, Config),
- ConnectorType = ?config(connector_type, Config),
- ConnectorConfig = ?config(connector_config, Config),
- ct:pal("creating connector with config: ~p", [ConnectorConfig]),
- {ok, _} =
- emqx_connector:create(ConnectorType, ConnectorName, ConnectorConfig),
- ct:pal("creating bridge with config: ~p", [BridgeConfig]),
- emqx_bridge_v2:create(BridgeType, BridgeName, BridgeConfig).
- get_ct_config_with_fallback(Config, [Key]) ->
- ?config(Key, Config);
- get_ct_config_with_fallback(Config, [Key | Rest]) ->
- case ?config(Key, Config) of
- undefined ->
- get_ct_config_with_fallback(Config, Rest);
- X ->
- X
- end.
- get_config_by_kind(Config, Overrides) ->
- Kind = ?config(bridge_kind, Config),
- get_config_by_kind(Kind, Config, Overrides).
- get_config_by_kind(Kind, Config, Overrides) ->
- case Kind of
- action ->
- %% TODO: refactor tests to use action_type...
- ActionType = get_ct_config_with_fallback(Config, [action_type, bridge_type]),
- ActionName = get_ct_config_with_fallback(Config, [action_name, bridge_name]),
- ActionConfig0 = get_ct_config_with_fallback(Config, [action_config, bridge_config]),
- ActionConfig = emqx_utils_maps:deep_merge(ActionConfig0, Overrides),
- #{type => ActionType, name => ActionName, config => ActionConfig};
- source ->
- SourceType = ?config(source_type, Config),
- SourceName = ?config(source_name, Config),
- SourceConfig0 = ?config(source_config, Config),
- SourceConfig = emqx_utils_maps:deep_merge(SourceConfig0, Overrides),
- #{type => SourceType, name => SourceName, config => SourceConfig}
- end.
- api_path_root(Kind) ->
- case Kind of
- action -> "actions";
- source -> "sources"
- end.
- conf_root_key(Kind) ->
- case Kind of
- action -> ?ROOT_KEY_ACTIONS;
- source -> ?ROOT_KEY_SOURCES
- end.
- maybe_json_decode(X) ->
- case emqx_utils_json:safe_decode(X, [return_maps]) of
- {ok, Decoded} -> Decoded;
- {error, _} -> X
- end.
- request(Method, Path, Params) ->
- AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
- Opts = #{return_all => true},
- case emqx_mgmt_api_test_util:request_api(Method, Path, "", AuthHeader, Params, Opts) of
- {ok, {Status, Headers, Body0}} ->
- Body = maybe_json_decode(Body0),
- {ok, {Status, Headers, Body}};
- {error, {Status, Headers, Body0}} ->
- Body =
- case emqx_utils_json:safe_decode(Body0, [return_maps]) of
- {ok, Decoded0 = #{<<"message">> := Msg0}} ->
- Msg = maybe_json_decode(Msg0),
- Decoded0#{<<"message">> := Msg};
- {ok, Decoded0} ->
- Decoded0;
- {error, _} ->
- Body0
- end,
- {error, {Status, Headers, Body}};
- Error ->
- Error
- end.
- list_bridges_api() ->
- Params = [],
- Path = emqx_mgmt_api_test_util:api_path(["actions"]),
- AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
- Opts = #{return_all => true},
- ct:pal("listing bridges (via http)"),
- Res =
- case emqx_mgmt_api_test_util:request_api(get, Path, "", AuthHeader, Params, Opts) of
- {ok, {Status, Headers, Body0}} ->
- {ok, {Status, Headers, emqx_utils_json:decode(Body0, [return_maps])}};
- Error ->
- Error
- end,
- ct:pal("list bridges result: ~p", [Res]),
- Res.
- get_source_api(BridgeType, BridgeName) ->
- get_bridge_api(source, BridgeType, BridgeName).
- get_bridge_api(BridgeType, BridgeName) ->
- get_bridge_api(action, BridgeType, BridgeName).
- get_bridge_api(BridgeKind, BridgeType, BridgeName) ->
- BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName),
- Params = [],
- Root =
- case BridgeKind of
- source -> "sources";
- action -> "actions"
- end,
- Path = emqx_mgmt_api_test_util:api_path([Root, BridgeId]),
- ct:pal("get bridge ~p (via http)", [{BridgeKind, BridgeType, BridgeName}]),
- Res = request(get, Path, Params),
- ct:pal("get bridge ~p result: ~p", [{BridgeKind, BridgeType, BridgeName}, Res]),
- Res.
- create_bridge_api(Config) ->
- create_bridge_api(Config, _Overrides = #{}).
- create_bridge_api(Config, Overrides) ->
- {ok, {{_, 201, _}, _, _}} = create_connector_api(Config),
- create_kind_api(Config, Overrides).
- create_kind_api(Config) ->
- create_kind_api(Config, _Overrides = #{}).
- create_kind_api(Config, Overrides) ->
- Kind = proplists:get_value(bridge_kind, Config, action),
- #{
- type := Type,
- name := Name,
- config := BridgeConfig
- } = get_config_by_kind(Kind, Config, Overrides),
- Params = BridgeConfig#{<<"type">> => Type, <<"name">> => Name},
- PathRoot = api_path_root(Kind),
- Path = emqx_mgmt_api_test_util:api_path([PathRoot]),
- ct:pal("creating bridge (~s, http):\n ~p", [Kind, Params]),
- Res = request(post, Path, Params),
- ct:pal("bridge create (~s, http) result:\n ~p", [Kind, Res]),
- Res.
- create_connector_api(Config) ->
- create_connector_api(Config, _Overrides = #{}).
- create_connector_api(Config, Overrides) ->
- ConnectorConfig0 = ?config(connector_config, Config),
- ConnectorName = ?config(connector_name, Config),
- ConnectorType = ?config(connector_type, Config),
- ConnectorConfig = emqx_utils_maps:deep_merge(ConnectorConfig0, Overrides),
- create_connector_api(ConnectorName, ConnectorType, ConnectorConfig).
- create_connector_api(ConnectorName, ConnectorType, ConnectorConfig) ->
- Path = emqx_mgmt_api_test_util:api_path(["connectors"]),
- Params = ConnectorConfig#{<<"type">> => ConnectorType, <<"name">> => ConnectorName},
- ct:pal("creating connector (http):\n ~p", [Params]),
- Res = request(post, Path, Params),
- ct:pal("connector create (http) result:\n ~p", [Res]),
- Res.
- update_connector_api(ConnectorName, ConnectorType, ConnectorConfig) ->
- ConnectorId = emqx_connector_resource:connector_id(ConnectorType, ConnectorName),
- Path = emqx_mgmt_api_test_util:api_path(["connectors", ConnectorId]),
- ct:pal("updating connector ~s (http):\n ~p", [ConnectorId, ConnectorConfig]),
- Res = request(put, Path, ConnectorConfig),
- ct:pal("connector update (http) result:\n ~p", [Res]),
- Res.
- start_connector_api(ConnectorName, ConnectorType) ->
- ConnectorId = emqx_connector_resource:connector_id(ConnectorType, ConnectorName),
- Path = emqx_mgmt_api_test_util:api_path(["connectors", ConnectorId, "start"]),
- ct:pal("starting connector ~s (http)", [ConnectorId]),
- Res = request(post, Path, #{}),
- ct:pal("connector update (http) result:\n ~p", [Res]),
- Res.
- get_connector_api(ConnectorType, ConnectorName) ->
- ConnectorId = emqx_connector_resource:connector_id(ConnectorType, ConnectorName),
- Path = emqx_mgmt_api_test_util:api_path(["connectors", ConnectorId]),
- ct:pal("get connector ~s (http)", [ConnectorId]),
- Res = request(get, Path, _Params = []),
- ct:pal("get connector (http) result:\n ~p", [Res]),
- Res.
- create_action_api(Config) ->
- create_action_api(Config, _Overrides = #{}).
- create_action_api(Config, Overrides) ->
- ActionName = ?config(action_name, Config),
- ActionType = ?config(action_type, Config),
- ActionConfig0 = ?config(action_config, Config),
- ActionConfig = emqx_utils_maps:deep_merge(ActionConfig0, Overrides),
- Params = ActionConfig#{<<"type">> => ActionType, <<"name">> => ActionName},
- Path = emqx_mgmt_api_test_util:api_path(["actions"]),
- ct:pal("creating action (http):\n ~p", [Params]),
- Res = request(post, Path, Params),
- ct:pal("action create (http) result:\n ~p", [Res]),
- Res.
- get_action_api(Config) ->
- ActionName = ?config(action_name, Config),
- ActionType = ?config(action_type, Config),
- ActionId = emqx_bridge_resource:bridge_id(ActionType, ActionName),
- Path = emqx_mgmt_api_test_util:api_path(["actions", ActionId]),
- ct:pal("getting action (http)"),
- Res = request(get, Path, []),
- ct:pal("get action (http) result:\n ~p", [Res]),
- Res.
- update_bridge_api(Config) ->
- update_bridge_api(Config, _Overrides = #{}).
- update_bridge_api(Config, Overrides) ->
- Kind = proplists:get_value(bridge_kind, Config, action),
- #{
- type := Type,
- name := Name,
- config := Params
- } = get_config_by_kind(Kind, Config, Overrides),
- BridgeId = emqx_bridge_resource:bridge_id(Type, Name),
- PathRoot = api_path_root(Kind),
- Path = emqx_mgmt_api_test_util:api_path([PathRoot, BridgeId]),
- ct:pal("updating bridge (~s, http):\n ~p", [Kind, Params]),
- Res = request(put, Path, Params),
- ct:pal("update bridge (~s, http) result:\n ~p", [Kind, Res]),
- Res.
- op_bridge_api(Op, BridgeType, BridgeName) ->
- op_bridge_api(_Kind = action, Op, BridgeType, BridgeName).
- op_bridge_api(Kind, Op, BridgeType, BridgeName) ->
- BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName),
- PathRoot = api_path_root(Kind),
- Path = emqx_mgmt_api_test_util:api_path([PathRoot, BridgeId, Op]),
- ct:pal("calling bridge ~p (~s, http):\n ~p", [BridgeId, Kind, Op]),
- Method = post,
- Params = [],
- Res = request(Method, Path, Params),
- ct:pal("bridge op result:\n ~p", [Res]),
- Res.
- probe_bridge_api(Config) ->
- probe_bridge_api(Config, _Overrides = #{}).
- probe_bridge_api(Config, Overrides) ->
- BridgeType = ?config(bridge_type, Config),
- BridgeName = ?config(bridge_name, Config),
- BridgeConfig0 = ?config(bridge_config, Config),
- BridgeConfig = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides),
- probe_bridge_api(BridgeType, BridgeName, BridgeConfig).
- probe_bridge_api(BridgeType, BridgeName, BridgeConfig) ->
- probe_bridge_api(action, BridgeType, BridgeName, BridgeConfig).
- probe_bridge_api(Kind, BridgeType, BridgeName, BridgeConfig) ->
- Params = BridgeConfig#{<<"type">> => BridgeType, <<"name">> => BridgeName},
- PathRoot = api_path_root(Kind),
- Path = emqx_mgmt_api_test_util:api_path([PathRoot ++ "_probe"]),
- ct:pal("probing bridge (~s, http):\n ~p", [Kind, Params]),
- Method = post,
- Res = request(Method, Path, Params),
- ct:pal("bridge probe (~s, http) result:\n ~p", [Kind, Res]),
- Res.
- probe_connector_api(Config) ->
- probe_connector_api(Config, _Overrides = #{}).
- probe_connector_api(Config, Overrides) ->
- #{
- connector_type := Type,
- connector_name := Name
- } = get_common_values(Config),
- ConnectorConfig0 = get_value(connector_config, Config),
- ConnectorConfig1 = emqx_utils_maps:deep_merge(ConnectorConfig0, Overrides),
- Params = ConnectorConfig1#{<<"type">> => Type, <<"name">> => Name},
- Path = emqx_mgmt_api_test_util:api_path(["connectors_probe"]),
- ct:pal("probing connector (~s, http):\n ~p", [Type, Params]),
- Method = post,
- Res = request(Method, Path, Params),
- ct:pal("probing connector (~s, http) result:\n ~p", [Type, Res]),
- Res.
- list_bridges_http_api_v1() ->
- Path = emqx_mgmt_api_test_util:api_path(["bridges"]),
- ct:pal("list bridges (http v1)"),
- Res = request(get, Path, _Params = []),
- ct:pal("list bridges (http v1) result:\n ~p", [Res]),
- Res.
- list_actions_http_api() ->
- Path = emqx_mgmt_api_test_util:api_path(["actions"]),
- ct:pal("list actions (http v2)"),
- Res = request(get, Path, _Params = []),
- ct:pal("list actions (http v2) result:\n ~p", [Res]),
- Res.
- list_sources_http_api() ->
- Path = emqx_mgmt_api_test_util:api_path(["sources"]),
- ct:pal("list sources (http v2)"),
- Res = request(get, Path, _Params = []),
- ct:pal("list sources (http v2) result:\n ~p", [Res]),
- Res.
- list_connectors_http_api() ->
- Path = emqx_mgmt_api_test_util:api_path(["connectors"]),
- ct:pal("list connectors"),
- Res = request(get, Path, _Params = []),
- ct:pal("list connectors result:\n ~p", [Res]),
- Res.
- update_rule_http(RuleId, Params) ->
- Path = emqx_mgmt_api_test_util:api_path(["rules", RuleId]),
- ct:pal("update rule ~p:\n ~p", [RuleId, Params]),
- Res = request(put, Path, Params),
- ct:pal("update rule ~p result:\n ~p", [RuleId, Res]),
- Res.
- enable_rule_http(RuleId) ->
- Params = #{<<"enable">> => true},
- update_rule_http(RuleId, Params).
- is_rule_enabled(RuleId) ->
- {ok, #{enable := Enable}} = emqx_rule_engine:get_rule(RuleId),
- Enable.
- try_decode_error(Body0) ->
- case emqx_utils_json:safe_decode(Body0, [return_maps]) of
- {ok, #{<<"message">> := Msg0} = Body1} ->
- case emqx_utils_json:safe_decode(Msg0, [return_maps]) of
- {ok, Msg1} -> Body1#{<<"message">> := Msg1};
- {error, _} -> Body1
- end;
- {ok, Body1} ->
- Body1;
- {error, _} ->
- Body0
- end.
- create_rule_api(Opts) ->
- #{
- sql := SQL,
- actions := RuleActions
- } = Opts,
- Params = #{
- enable => true,
- sql => SQL,
- actions => RuleActions
- },
- Path = emqx_mgmt_api_test_util:api_path(["rules"]),
- ct:pal("create rule:\n ~p", [Params]),
- Method = post,
- Res = request(Method, Path, Params),
- ct:pal("create rule results:\n ~p", [Res]),
- Res.
- create_rule_and_action_http(BridgeType, RuleTopic, Config) ->
- create_rule_and_action_http(BridgeType, RuleTopic, Config, _Opts = #{}).
- create_rule_and_action_http(BridgeType, RuleTopic, Config, Opts) ->
- BridgeName = ?config(bridge_name, Config),
- BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName),
- SQL = maps:get(sql, Opts, <<"SELECT * FROM \"", RuleTopic/binary, "\"">>),
- Params0 = #{
- enable => true,
- sql => SQL,
- actions => [BridgeId]
- },
- Overrides = maps:get(overrides, Opts, #{}),
- Params = emqx_utils_maps:deep_merge(Params0, Overrides),
- Path = emqx_mgmt_api_test_util:api_path(["rules"]),
- AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
- ct:pal("rule action params: ~p", [Params]),
- case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of
- {ok, Res0} ->
- Res = #{<<"id">> := RuleId} = emqx_utils_json:decode(Res0, [return_maps]),
- on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
- {ok, Res};
- Error ->
- Error
- end.
- api_spec_schemas(Root) ->
- Method = get,
- Path = emqx_mgmt_api_test_util:api_path(["schemas", Root]),
- Params = [],
- AuthHeader = [],
- Opts = #{return_all => true},
- case emqx_mgmt_api_test_util:request_api(Method, Path, "", AuthHeader, Params, Opts) of
- {ok, {{_, 200, _}, _, Res0}} ->
- #{<<"components">> := #{<<"schemas">> := Schemas}} =
- emqx_utils_json:decode(Res0, [return_maps]),
- Schemas
- end.
- bridges_api_spec_schemas() ->
- api_spec_schemas("bridges").
- actions_api_spec_schemas() ->
- api_spec_schemas("actions").
- get_value(Key, Config) ->
- case proplists:get_value(Key, Config, undefined) of
- undefined ->
- error({missing_required_config, Key, Config});
- Value ->
- Value
- end.
- get_common_values(Config) ->
- Kind = proplists:get_value(bridge_kind, Config, action),
- case Kind of
- action ->
- #{
- conf_root_key => actions,
- kind => Kind,
- type => get_ct_config_with_fallback(Config, [action_type, bridge_type]),
- name => get_ct_config_with_fallback(Config, [action_name, bridge_name]),
- connector_type => get_value(connector_type, Config),
- connector_name => get_value(connector_name, Config)
- };
- source ->
- #{
- conf_root_key => sources,
- kind => Kind,
- type => get_value(source_type, Config),
- name => get_value(source_name, Config),
- connector_type => get_value(connector_type, Config),
- connector_name => get_value(connector_name, Config)
- }
- end.
- connector_resource_id(Config) ->
- #{connector_type := Type, connector_name := Name} = get_common_values(Config),
- emqx_connector_resource:resource_id(Type, Name).
- health_check_channel(Config) ->
- ConnectorResId = connector_resource_id(Config),
- ChannelResId = resource_id(Config),
- emqx_resource_manager:channel_health_check(ConnectorResId, ChannelResId).
- %%------------------------------------------------------------------------------
- %% Internal export
- %%------------------------------------------------------------------------------
- source_hookpoint_callback(Message, TestPid) ->
- TestPid ! {consumed_message, Message},
- ok.
- %%------------------------------------------------------------------------------
- %% Testcases
- %%------------------------------------------------------------------------------
- t_sync_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint) ->
- ?check_trace(
- begin
- ?assertMatch({ok, _}, create_bridge_api(Config)),
- ResourceId = resource_id(Config),
- ?retry(
- _Sleep = 1_000,
- _Attempts = 20,
- ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
- ),
- BridgeId = bridge_id(Config),
- Message = {BridgeId, MakeMessageFun()},
- IsSuccessCheck(emqx_resource:simple_sync_query(ResourceId, Message)),
- ok
- end,
- fun(Trace) ->
- ResourceId = resource_id(Config),
- ?assertMatch([#{instance_id := ResourceId}], ?of_kind(TracePoint, Trace))
- end
- ),
- ok.
- t_async_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint) ->
- ReplyFun =
- fun(Pid, Result) ->
- Pid ! {result, Result}
- end,
- ?check_trace(
- begin
- ?assertMatch({ok, _}, create_bridge_api(Config)),
- ResourceId = resource_id(Config),
- ?retry(
- _Sleep = 1_000,
- _Attempts = 20,
- ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
- ),
- BridgeId = bridge_id(Config),
- BridgeType = ?config(bridge_type, Config),
- BridgeName = ?config(bridge_name, Config),
- Message = {BridgeId, MakeMessageFun()},
- ?assertMatch(
- {ok, {ok, _}},
- ?wait_async_action(
- emqx_bridge_v2:query(BridgeType, BridgeName, Message, #{
- async_reply_fun => {ReplyFun, [self()]}
- }),
- #{?snk_kind := TracePoint, instance_id := ResourceId},
- 5_000
- )
- ),
- ok
- end,
- fun(Trace) ->
- ResourceId = resource_id(Config),
- ?assertMatch([#{instance_id := ResourceId}], ?of_kind(TracePoint, Trace))
- end
- ),
- receive
- {result, Result} -> IsSuccessCheck(Result)
- after 8_000 ->
- throw(timeout)
- end,
- ok.
- %% - `ProduceFn': produces a message in the remote system that shall be consumed. May be
- %% a `{function(), integer()}' tuple.
- %% - `Tracepoint': marks the end of consumed message processing.
- t_consume(Config, Opts) ->
- #{
- consumer_ready_tracepoint := ConsumerReadyTPFn,
- produce_fn := ProduceFn,
- check_fn := CheckFn,
- produce_tracepoint := TracePointFn
- } = Opts,
- ?check_trace(
- begin
- ConsumerReadyTimeout = maps:get(consumer_ready_timeout, Opts, 15_000),
- case ConsumerReadyTPFn of
- {Predicate, NEvents} when is_function(Predicate) ->
- {ok, SRef0} = snabbkaffe:subscribe(Predicate, NEvents, ConsumerReadyTimeout);
- Predicate when is_function(Predicate) ->
- {ok, SRef0} = snabbkaffe:subscribe(
- Predicate, _NEvents = 1, ConsumerReadyTimeout
- )
- end,
- ?assertMatch({ok, _}, create_bridge_api(Config)),
- ?assertMatch({ok, _}, snabbkaffe:receive_events(SRef0)),
- ok = add_source_hookpoint(Config),
- ?retry(
- _Sleep = 200,
- _Attempts = 20,
- ?assertMatch(
- #{status := ?status_connected},
- health_check_channel(Config)
- )
- ),
- ?assertMatch(
- {_, {ok, _}},
- snabbkaffe:wait_async_action(
- ProduceFn,
- TracePointFn,
- 15_000
- )
- ),
- receive
- {consumed_message, Message} ->
- CheckFn(Message)
- after 5_000 ->
- error({timeout, process_info(self(), messages)})
- end,
- ok
- end,
- []
- ),
- ok.
- t_create_via_http(Config) ->
- ?check_trace(
- begin
- ?assertMatch({ok, _}, create_bridge_api(Config)),
- ?assertMatch(
- {ok, _},
- update_bridge_api(
- Config
- )
- ),
- %% check that v1 list API is fine
- ?assertMatch(
- {ok, {{_, 200, _}, _, _}},
- list_bridges_http_api_v1()
- ),
- ok
- end,
- []
- ),
- ok.
- t_start_stop(Config, StopTracePoint) ->
- Kind = proplists:get_value(bridge_kind, Config, action),
- ConnectorName = ?config(connector_name, Config),
- ConnectorType = ?config(connector_type, Config),
- #{
- type := Type,
- name := Name,
- config := BridgeConfig
- } = get_config_by_kind(Kind, Config, _Overrides = #{}),
- ?assertMatch(
- {ok, {{_, 201, _}, _, _}},
- create_connector_api(Config)
- ),
- ?check_trace(
- begin
- ?assertMatch(
- {ok, {{_, 204, _}, _Headers, _Body}},
- probe_bridge_api(
- Kind,
- Type,
- Name,
- BridgeConfig
- )
- ),
- %% Check that the bridge probe API doesn't leak atoms.
- AtomsBefore = erlang:system_info(atom_count),
- %% Probe again; shouldn't have created more atoms.
- ProbeRes1 = probe_bridge_api(
- Kind,
- Type,
- Name,
- BridgeConfig
- ),
- ?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes1),
- AtomsAfter = erlang:system_info(atom_count),
- ?assertEqual(AtomsBefore, AtomsAfter),
- ?assertMatch({ok, _}, create_kind_api(Config)),
- ResourceId = emqx_bridge_resource:resource_id(conf_root_key(Kind), Type, Name),
- %% Since the connection process is async, we give it some time to
- %% stabilize and avoid flakiness.
- ?retry(
- _Sleep = 1_000,
- _Attempts = 20,
- ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
- ),
- %% `start` bridge to trigger `already_started`
- ?assertMatch(
- {ok, {{_, 204, _}, _Headers, []}},
- op_bridge_api(Kind, "start", Type, Name)
- ),
- ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)),
- %% Not supported anymore
- %% ?assertMatch(
- %% {{ok, _}, {ok, _}},
- %% ?wait_async_action(
- %% emqx_bridge_v2_testlib:op_bridge_api("stop", BridgeType, BridgeName),
- %% #{?snk_kind := StopTracePoint},
- %% 5_000
- %% )
- %% ),
- %% ?assertEqual(
- %% {error, resource_is_stopped}, emqx_resource_manager:health_check(ResourceId)
- %% ),
- %% ?assertMatch(
- %% {ok, {{_, 204, _}, _Headers, []}},
- %% emqx_bridge_v2_testlib:op_bridge_api("stop", BridgeType, BridgeName)
- %% ),
- %% ?assertEqual(
- %% {error, resource_is_stopped}, emqx_resource_manager:health_check(ResourceId)
- %% ),
- %% ?assertMatch(
- %% {ok, {{_, 204, _}, _Headers, []}},
- %% emqx_bridge_v2_testlib:op_bridge_api("start", BridgeType, BridgeName)
- %% ),
- ?retry(
- _Sleep = 1_000,
- _Attempts = 20,
- ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
- ),
- %% Disable the connector, which will also stop it.
- ?assertMatch(
- {{ok, _}, {ok, _}},
- ?wait_async_action(
- emqx_connector:disable_enable(disable, ConnectorType, ConnectorName),
- #{?snk_kind := StopTracePoint},
- 5_000
- )
- ),
- #{resource_id => ResourceId}
- end,
- fun(Res, Trace) ->
- #{resource_id := ResourceId} = Res,
- %% one for each probe, one for real
- ?assertMatch(
- [_, _, #{instance_id := ResourceId}],
- ?of_kind(StopTracePoint, Trace)
- ),
- ok
- end
- ),
- ok.
- t_on_get_status(Config) ->
- t_on_get_status(Config, _Opts = #{}).
- t_on_get_status(Config, Opts) ->
- ProxyPort = ?config(proxy_port, Config),
- ProxyHost = ?config(proxy_host, Config),
- ProxyName = ?config(proxy_name, Config),
- FailureStatus = maps:get(failure_status, Opts, disconnected),
- ?assertMatch({ok, _}, create_bridge(Config)),
- ResourceId = resource_id(Config),
- %% Since the connection process is async, we give it some time to
- %% stabilize and avoid flakiness.
- ?retry(
- _Sleep = 1_000,
- _Attempts = 20,
- ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
- ),
- case ProxyHost of
- undefined ->
- ok;
- _ ->
- emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
- ?retry(
- _Interval0 = 100,
- _Attempts0 = 20,
- ?assertEqual(
- {ok, FailureStatus}, emqx_resource_manager:health_check(ResourceId)
- )
- )
- end),
- %% Check that it recovers itself.
- ?retry(
- _Sleep = 1_000,
- _Attempts = 20,
- ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
- )
- end,
- ok.
|