emqx_bridge_v2_testlib.erl 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%--------------------------------------------------------------------
  4. -module(emqx_bridge_v2_testlib).
  5. -compile(nowarn_export_all).
  6. -compile(export_all).
  7. -include_lib("eunit/include/eunit.hrl").
  8. -include_lib("common_test/include/ct.hrl").
  9. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  10. -include_lib("emqx_resource/include/emqx_resource.hrl").
  11. -import(emqx_common_test_helpers, [on_exit/1]).
  12. -define(ROOT_KEY_ACTIONS, actions).
  13. -define(ROOT_KEY_SOURCES, sources).
  14. %% ct setup helpers
  15. init_per_suite(Config, Apps) ->
  16. [{start_apps, Apps} | Config].
  17. end_per_suite(Config) ->
  18. delete_all_bridges_and_connectors(),
  19. emqx_mgmt_api_test_util:end_suite(),
  20. ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
  21. ok = emqx_connector_test_helpers:stop_apps(lists:reverse(?config(start_apps, Config))),
  22. _ = application:stop(emqx_connector),
  23. ok.
  24. init_per_group(TestGroup, BridgeType, Config) ->
  25. ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
  26. ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
  27. emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
  28. application:load(emqx_bridge),
  29. ok = emqx_common_test_helpers:start_apps([emqx_conf]),
  30. ok = emqx_connector_test_helpers:start_apps(?config(start_apps, Config)),
  31. {ok, _} = application:ensure_all_started(emqx_connector),
  32. emqx_mgmt_api_test_util:init_suite(),
  33. UniqueNum = integer_to_binary(erlang:unique_integer([positive])),
  34. MQTTTopic = <<"mqtt/topic/abc", UniqueNum/binary>>,
  35. [
  36. {proxy_host, ProxyHost},
  37. {proxy_port, ProxyPort},
  38. {mqtt_topic, MQTTTopic},
  39. {test_group, TestGroup},
  40. {bridge_type, BridgeType}
  41. | Config
  42. ].
  43. end_per_group(Config) ->
  44. ProxyHost = ?config(proxy_host, Config),
  45. ProxyPort = ?config(proxy_port, Config),
  46. emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
  47. % delete_all_bridges(),
  48. ok.
  49. init_per_testcase(TestCase, Config0, BridgeConfigCb) ->
  50. ct:timetrap(timer:seconds(60)),
  51. delete_all_bridges_and_connectors(),
  52. UniqueNum = integer_to_binary(erlang:unique_integer()),
  53. BridgeTopic =
  54. <<
  55. (atom_to_binary(TestCase))/binary,
  56. UniqueNum/binary
  57. >>,
  58. TestGroup = ?config(test_group, Config0),
  59. Config = [{bridge_topic, BridgeTopic} | Config0],
  60. {Name, ConfigString, BridgeConfig} = BridgeConfigCb(
  61. TestCase, TestGroup, Config
  62. ),
  63. ok = snabbkaffe:start_trace(),
  64. [
  65. {bridge_name, Name},
  66. {bridge_config_string, ConfigString},
  67. {bridge_config, BridgeConfig}
  68. | Config
  69. ].
  70. end_per_testcase(_Testcase, Config) ->
  71. case proplists:get_bool(skip_does_not_apply, Config) of
  72. true ->
  73. ok;
  74. false ->
  75. ProxyHost = ?config(proxy_host, Config),
  76. ProxyPort = ?config(proxy_port, Config),
  77. emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
  78. %% in CI, apparently this needs more time since the
  79. %% machines struggle with all the containers running...
  80. emqx_common_test_helpers:call_janitor(60_000),
  81. delete_all_bridges_and_connectors(),
  82. ok = snabbkaffe:stop(),
  83. ok
  84. end.
  85. delete_all_bridges_and_connectors() ->
  86. delete_all_bridges(),
  87. delete_all_connectors().
  88. delete_all_bridges() ->
  89. lists:foreach(
  90. fun(#{name := Name, type := Type}) ->
  91. emqx_bridge_v2:remove(actions, Type, Name)
  92. end,
  93. emqx_bridge_v2:list(actions)
  94. ),
  95. lists:foreach(
  96. fun(#{name := Name, type := Type}) ->
  97. emqx_bridge_v2:remove(sources, Type, Name)
  98. end,
  99. emqx_bridge_v2:list(sources)
  100. ).
  101. delete_all_connectors() ->
  102. lists:foreach(
  103. fun(#{name := Name, type := Type}) ->
  104. emqx_connector:remove(Type, Name)
  105. end,
  106. emqx_connector:list()
  107. ).
  108. %% test helpers
  109. parse_and_check(Type, Name, InnerConfigMap0) ->
  110. parse_and_check(action, Type, Name, InnerConfigMap0).
  111. parse_and_check(Kind, Type, Name, InnerConfigMap0) ->
  112. RootBin =
  113. case Kind of
  114. action -> <<"actions">>;
  115. source -> <<"sources">>
  116. end,
  117. TypeBin = emqx_utils_conv:bin(Type),
  118. RawConf = #{RootBin => #{TypeBin => #{Name => InnerConfigMap0}}},
  119. do_parse_and_check(RootBin, TypeBin, Name, emqx_bridge_v2_schema, RawConf).
  120. parse_and_check_connector(Type, Name, InnerConfigMap0) ->
  121. TypeBin = emqx_utils_conv:bin(Type),
  122. RawConf = #{<<"connectors">> => #{TypeBin => #{Name => InnerConfigMap0}}},
  123. do_parse_and_check(<<"connectors">>, TypeBin, Name, emqx_connector_schema, RawConf).
  124. do_parse_and_check(RootBin, TypeBin, NameBin, SchemaMod, RawConf) ->
  125. #{RootBin := #{TypeBin := #{NameBin := _}}} = hocon_tconf:check_plain(
  126. SchemaMod,
  127. RawConf,
  128. #{
  129. required => false,
  130. atom_key => false,
  131. %% to trigger validators that otherwise aren't triggered
  132. make_serializable => false
  133. }
  134. ),
  135. #{RootBin := #{TypeBin := #{NameBin := InnerConfigMap}}} = hocon_tconf:check_plain(
  136. SchemaMod,
  137. RawConf,
  138. #{
  139. required => false,
  140. atom_key => false,
  141. make_serializable => true
  142. }
  143. ),
  144. InnerConfigMap.
  145. bridge_id(Config) ->
  146. BridgeType = ?config(bridge_type, Config),
  147. BridgeName = ?config(bridge_name, Config),
  148. BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName),
  149. ConnectorId = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
  150. <<"action:", BridgeId/binary, ":", ConnectorId/binary>>.
  151. source_hookpoint(Config) ->
  152. #{kind := source, type := Type, name := Name} = get_common_values(Config),
  153. BridgeId = emqx_bridge_resource:bridge_id(Type, Name),
  154. emqx_bridge_v2:source_hookpoint(BridgeId).
  155. action_hookpoint(Config) ->
  156. #{kind := action, type := Type, name := Name} = get_common_values(Config),
  157. BridgeId = emqx_bridge_resource:bridge_id(Type, Name),
  158. emqx_bridge_resource:bridge_hookpoint(BridgeId).
  159. add_source_hookpoint(Config) ->
  160. Hookpoint = source_hookpoint(Config),
  161. ok = emqx_hooks:add(Hookpoint, {?MODULE, source_hookpoint_callback, [self()]}, 1000),
  162. on_exit(fun() -> emqx_hooks:del(Hookpoint, {?MODULE, source_hookpoint_callback}) end),
  163. ok.
  164. resource_id(Config) ->
  165. #{
  166. kind := Kind,
  167. type := Type,
  168. name := Name,
  169. connector_name := ConnectorName
  170. } = get_common_values(Config),
  171. case Kind of
  172. source ->
  173. emqx_bridge_v2:source_id(Type, Name, ConnectorName);
  174. action ->
  175. emqx_bridge_resource:resource_id(Type, Name)
  176. end.
  177. create_bridge(Config) ->
  178. create_bridge(Config, _Overrides = #{}).
  179. create_bridge(Config, Overrides) ->
  180. BridgeType = ?config(bridge_type, Config),
  181. BridgeName = ?config(bridge_name, Config),
  182. BridgeConfig0 = ?config(bridge_config, Config),
  183. BridgeConfig = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides),
  184. ConnectorName = ?config(connector_name, Config),
  185. ConnectorType = ?config(connector_type, Config),
  186. ConnectorConfig = ?config(connector_config, Config),
  187. ct:pal("creating connector with config: ~p", [ConnectorConfig]),
  188. {ok, _} =
  189. emqx_connector:create(ConnectorType, ConnectorName, ConnectorConfig),
  190. ct:pal("creating bridge with config: ~p", [BridgeConfig]),
  191. emqx_bridge_v2:create(BridgeType, BridgeName, BridgeConfig).
  192. get_ct_config_with_fallback(Config, [Key]) ->
  193. ?config(Key, Config);
  194. get_ct_config_with_fallback(Config, [Key | Rest]) ->
  195. case ?config(Key, Config) of
  196. undefined ->
  197. get_ct_config_with_fallback(Config, Rest);
  198. X ->
  199. X
  200. end.
  201. get_config_by_kind(Config, Overrides) ->
  202. Kind = ?config(bridge_kind, Config),
  203. get_config_by_kind(Kind, Config, Overrides).
  204. get_config_by_kind(Kind, Config, Overrides) ->
  205. case Kind of
  206. action ->
  207. %% TODO: refactor tests to use action_type...
  208. ActionType = get_ct_config_with_fallback(Config, [action_type, bridge_type]),
  209. ActionName = get_ct_config_with_fallback(Config, [action_name, bridge_name]),
  210. ActionConfig0 = get_ct_config_with_fallback(Config, [action_config, bridge_config]),
  211. ActionConfig = emqx_utils_maps:deep_merge(ActionConfig0, Overrides),
  212. #{type => ActionType, name => ActionName, config => ActionConfig};
  213. source ->
  214. SourceType = ?config(source_type, Config),
  215. SourceName = ?config(source_name, Config),
  216. SourceConfig0 = ?config(source_config, Config),
  217. SourceConfig = emqx_utils_maps:deep_merge(SourceConfig0, Overrides),
  218. #{type => SourceType, name => SourceName, config => SourceConfig}
  219. end.
  220. api_path_root(Kind) ->
  221. case Kind of
  222. action -> "actions";
  223. source -> "sources"
  224. end.
  225. conf_root_key(Kind) ->
  226. case Kind of
  227. action -> ?ROOT_KEY_ACTIONS;
  228. source -> ?ROOT_KEY_SOURCES
  229. end.
  230. maybe_json_decode(X) ->
  231. case emqx_utils_json:safe_decode(X, [return_maps]) of
  232. {ok, Decoded} -> Decoded;
  233. {error, _} -> X
  234. end.
  235. request(Method, Path, Params) ->
  236. AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
  237. Opts = #{return_all => true},
  238. case emqx_mgmt_api_test_util:request_api(Method, Path, "", AuthHeader, Params, Opts) of
  239. {ok, {Status, Headers, Body0}} ->
  240. Body = maybe_json_decode(Body0),
  241. {ok, {Status, Headers, Body}};
  242. {error, {Status, Headers, Body0}} ->
  243. Body =
  244. case emqx_utils_json:safe_decode(Body0, [return_maps]) of
  245. {ok, Decoded0 = #{<<"message">> := Msg0}} ->
  246. Msg = maybe_json_decode(Msg0),
  247. Decoded0#{<<"message">> := Msg};
  248. {ok, Decoded0} ->
  249. Decoded0;
  250. {error, _} ->
  251. Body0
  252. end,
  253. {error, {Status, Headers, Body}};
  254. Error ->
  255. Error
  256. end.
  257. list_bridges_api() ->
  258. Params = [],
  259. Path = emqx_mgmt_api_test_util:api_path(["actions"]),
  260. AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
  261. Opts = #{return_all => true},
  262. ct:pal("listing bridges (via http)"),
  263. Res =
  264. case emqx_mgmt_api_test_util:request_api(get, Path, "", AuthHeader, Params, Opts) of
  265. {ok, {Status, Headers, Body0}} ->
  266. {ok, {Status, Headers, emqx_utils_json:decode(Body0, [return_maps])}};
  267. Error ->
  268. Error
  269. end,
  270. ct:pal("list bridges result: ~p", [Res]),
  271. Res.
  272. get_source_api(BridgeType, BridgeName) ->
  273. get_bridge_api(source, BridgeType, BridgeName).
  274. get_bridge_api(BridgeType, BridgeName) ->
  275. get_bridge_api(action, BridgeType, BridgeName).
  276. get_bridge_api(BridgeKind, BridgeType, BridgeName) ->
  277. BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName),
  278. Params = [],
  279. Root =
  280. case BridgeKind of
  281. source -> "sources";
  282. action -> "actions"
  283. end,
  284. Path = emqx_mgmt_api_test_util:api_path([Root, BridgeId]),
  285. ct:pal("get bridge ~p (via http)", [{BridgeKind, BridgeType, BridgeName}]),
  286. Res = request(get, Path, Params),
  287. ct:pal("get bridge ~p result: ~p", [{BridgeKind, BridgeType, BridgeName}, Res]),
  288. Res.
  289. create_bridge_api(Config) ->
  290. create_bridge_api(Config, _Overrides = #{}).
  291. create_bridge_api(Config, Overrides) ->
  292. {ok, {{_, 201, _}, _, _}} = create_connector_api(Config),
  293. create_kind_api(Config, Overrides).
  294. create_kind_api(Config) ->
  295. create_kind_api(Config, _Overrides = #{}).
  296. create_kind_api(Config, Overrides) ->
  297. Kind = proplists:get_value(bridge_kind, Config, action),
  298. #{
  299. type := Type,
  300. name := Name,
  301. config := BridgeConfig
  302. } = get_config_by_kind(Kind, Config, Overrides),
  303. Params = BridgeConfig#{<<"type">> => Type, <<"name">> => Name},
  304. PathRoot = api_path_root(Kind),
  305. Path = emqx_mgmt_api_test_util:api_path([PathRoot]),
  306. ct:pal("creating bridge (~s, http):\n ~p", [Kind, Params]),
  307. Res = request(post, Path, Params),
  308. ct:pal("bridge create (~s, http) result:\n ~p", [Kind, Res]),
  309. Res.
  310. create_connector_api(Config) ->
  311. create_connector_api(Config, _Overrides = #{}).
  312. create_connector_api(Config, Overrides) ->
  313. ConnectorConfig0 = ?config(connector_config, Config),
  314. ConnectorName = ?config(connector_name, Config),
  315. ConnectorType = ?config(connector_type, Config),
  316. ConnectorConfig = emqx_utils_maps:deep_merge(ConnectorConfig0, Overrides),
  317. create_connector_api(ConnectorName, ConnectorType, ConnectorConfig).
  318. create_connector_api(ConnectorName, ConnectorType, ConnectorConfig) ->
  319. Path = emqx_mgmt_api_test_util:api_path(["connectors"]),
  320. Params = ConnectorConfig#{<<"type">> => ConnectorType, <<"name">> => ConnectorName},
  321. ct:pal("creating connector (http):\n ~p", [Params]),
  322. Res = request(post, Path, Params),
  323. ct:pal("connector create (http) result:\n ~p", [Res]),
  324. Res.
  325. update_connector_api(ConnectorName, ConnectorType, ConnectorConfig) ->
  326. ConnectorId = emqx_connector_resource:connector_id(ConnectorType, ConnectorName),
  327. Path = emqx_mgmt_api_test_util:api_path(["connectors", ConnectorId]),
  328. ct:pal("updating connector ~s (http):\n ~p", [ConnectorId, ConnectorConfig]),
  329. Res = request(put, Path, ConnectorConfig),
  330. ct:pal("connector update (http) result:\n ~p", [Res]),
  331. Res.
  332. start_connector_api(ConnectorName, ConnectorType) ->
  333. ConnectorId = emqx_connector_resource:connector_id(ConnectorType, ConnectorName),
  334. Path = emqx_mgmt_api_test_util:api_path(["connectors", ConnectorId, "start"]),
  335. ct:pal("starting connector ~s (http)", [ConnectorId]),
  336. Res = request(post, Path, #{}),
  337. ct:pal("connector update (http) result:\n ~p", [Res]),
  338. Res.
  339. get_connector_api(ConnectorType, ConnectorName) ->
  340. ConnectorId = emqx_connector_resource:connector_id(ConnectorType, ConnectorName),
  341. Path = emqx_mgmt_api_test_util:api_path(["connectors", ConnectorId]),
  342. ct:pal("get connector ~s (http)", [ConnectorId]),
  343. Res = request(get, Path, _Params = []),
  344. ct:pal("get connector (http) result:\n ~p", [Res]),
  345. Res.
  346. create_action_api(Config) ->
  347. create_action_api(Config, _Overrides = #{}).
  348. create_action_api(Config, Overrides) ->
  349. ActionName = ?config(action_name, Config),
  350. ActionType = ?config(action_type, Config),
  351. ActionConfig0 = ?config(action_config, Config),
  352. ActionConfig = emqx_utils_maps:deep_merge(ActionConfig0, Overrides),
  353. Params = ActionConfig#{<<"type">> => ActionType, <<"name">> => ActionName},
  354. Path = emqx_mgmt_api_test_util:api_path(["actions"]),
  355. ct:pal("creating action (http):\n ~p", [Params]),
  356. Res = request(post, Path, Params),
  357. ct:pal("action create (http) result:\n ~p", [Res]),
  358. Res.
  359. get_action_api(Config) ->
  360. ActionName = ?config(action_name, Config),
  361. ActionType = ?config(action_type, Config),
  362. ActionId = emqx_bridge_resource:bridge_id(ActionType, ActionName),
  363. Path = emqx_mgmt_api_test_util:api_path(["actions", ActionId]),
  364. ct:pal("getting action (http)"),
  365. Res = request(get, Path, []),
  366. ct:pal("get action (http) result:\n ~p", [Res]),
  367. Res.
  368. update_bridge_api(Config) ->
  369. update_bridge_api(Config, _Overrides = #{}).
  370. update_bridge_api(Config, Overrides) ->
  371. Kind = proplists:get_value(bridge_kind, Config, action),
  372. #{
  373. type := Type,
  374. name := Name,
  375. config := Params
  376. } = get_config_by_kind(Kind, Config, Overrides),
  377. BridgeId = emqx_bridge_resource:bridge_id(Type, Name),
  378. PathRoot = api_path_root(Kind),
  379. Path = emqx_mgmt_api_test_util:api_path([PathRoot, BridgeId]),
  380. ct:pal("updating bridge (~s, http):\n ~p", [Kind, Params]),
  381. Res = request(put, Path, Params),
  382. ct:pal("update bridge (~s, http) result:\n ~p", [Kind, Res]),
  383. Res.
  384. op_bridge_api(Op, BridgeType, BridgeName) ->
  385. op_bridge_api(_Kind = action, Op, BridgeType, BridgeName).
  386. op_bridge_api(Kind, Op, BridgeType, BridgeName) ->
  387. BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName),
  388. PathRoot = api_path_root(Kind),
  389. Path = emqx_mgmt_api_test_util:api_path([PathRoot, BridgeId, Op]),
  390. ct:pal("calling bridge ~p (~s, http):\n ~p", [BridgeId, Kind, Op]),
  391. Method = post,
  392. Params = [],
  393. Res = request(Method, Path, Params),
  394. ct:pal("bridge op result:\n ~p", [Res]),
  395. Res.
  396. probe_bridge_api(Config) ->
  397. probe_bridge_api(Config, _Overrides = #{}).
  398. probe_bridge_api(Config, Overrides) ->
  399. BridgeType = ?config(bridge_type, Config),
  400. BridgeName = ?config(bridge_name, Config),
  401. BridgeConfig0 = ?config(bridge_config, Config),
  402. BridgeConfig = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides),
  403. probe_bridge_api(BridgeType, BridgeName, BridgeConfig).
  404. probe_bridge_api(BridgeType, BridgeName, BridgeConfig) ->
  405. probe_bridge_api(action, BridgeType, BridgeName, BridgeConfig).
  406. probe_bridge_api(Kind, BridgeType, BridgeName, BridgeConfig) ->
  407. Params = BridgeConfig#{<<"type">> => BridgeType, <<"name">> => BridgeName},
  408. PathRoot = api_path_root(Kind),
  409. Path = emqx_mgmt_api_test_util:api_path([PathRoot ++ "_probe"]),
  410. ct:pal("probing bridge (~s, http):\n ~p", [Kind, Params]),
  411. Method = post,
  412. Res = request(Method, Path, Params),
  413. ct:pal("bridge probe (~s, http) result:\n ~p", [Kind, Res]),
  414. Res.
  415. probe_connector_api(Config) ->
  416. probe_connector_api(Config, _Overrides = #{}).
  417. probe_connector_api(Config, Overrides) ->
  418. #{
  419. connector_type := Type,
  420. connector_name := Name
  421. } = get_common_values(Config),
  422. ConnectorConfig0 = get_value(connector_config, Config),
  423. ConnectorConfig1 = emqx_utils_maps:deep_merge(ConnectorConfig0, Overrides),
  424. Params = ConnectorConfig1#{<<"type">> => Type, <<"name">> => Name},
  425. Path = emqx_mgmt_api_test_util:api_path(["connectors_probe"]),
  426. ct:pal("probing connector (~s, http):\n ~p", [Type, Params]),
  427. Method = post,
  428. Res = request(Method, Path, Params),
  429. ct:pal("probing connector (~s, http) result:\n ~p", [Type, Res]),
  430. Res.
  431. list_bridges_http_api_v1() ->
  432. Path = emqx_mgmt_api_test_util:api_path(["bridges"]),
  433. ct:pal("list bridges (http v1)"),
  434. Res = request(get, Path, _Params = []),
  435. ct:pal("list bridges (http v1) result:\n ~p", [Res]),
  436. Res.
  437. list_actions_http_api() ->
  438. Path = emqx_mgmt_api_test_util:api_path(["actions"]),
  439. ct:pal("list actions (http v2)"),
  440. Res = request(get, Path, _Params = []),
  441. ct:pal("list actions (http v2) result:\n ~p", [Res]),
  442. Res.
  443. list_sources_http_api() ->
  444. Path = emqx_mgmt_api_test_util:api_path(["sources"]),
  445. ct:pal("list sources (http v2)"),
  446. Res = request(get, Path, _Params = []),
  447. ct:pal("list sources (http v2) result:\n ~p", [Res]),
  448. Res.
  449. list_connectors_http_api() ->
  450. Path = emqx_mgmt_api_test_util:api_path(["connectors"]),
  451. ct:pal("list connectors"),
  452. Res = request(get, Path, _Params = []),
  453. ct:pal("list connectors result:\n ~p", [Res]),
  454. Res.
  455. update_rule_http(RuleId, Params) ->
  456. Path = emqx_mgmt_api_test_util:api_path(["rules", RuleId]),
  457. ct:pal("update rule ~p:\n ~p", [RuleId, Params]),
  458. Res = request(put, Path, Params),
  459. ct:pal("update rule ~p result:\n ~p", [RuleId, Res]),
  460. Res.
  461. enable_rule_http(RuleId) ->
  462. Params = #{<<"enable">> => true},
  463. update_rule_http(RuleId, Params).
  464. is_rule_enabled(RuleId) ->
  465. {ok, #{enable := Enable}} = emqx_rule_engine:get_rule(RuleId),
  466. Enable.
  467. try_decode_error(Body0) ->
  468. case emqx_utils_json:safe_decode(Body0, [return_maps]) of
  469. {ok, #{<<"message">> := Msg0} = Body1} ->
  470. case emqx_utils_json:safe_decode(Msg0, [return_maps]) of
  471. {ok, Msg1} -> Body1#{<<"message">> := Msg1};
  472. {error, _} -> Body1
  473. end;
  474. {ok, Body1} ->
  475. Body1;
  476. {error, _} ->
  477. Body0
  478. end.
  479. create_rule_api(Opts) ->
  480. #{
  481. sql := SQL,
  482. actions := RuleActions
  483. } = Opts,
  484. Params = #{
  485. enable => true,
  486. sql => SQL,
  487. actions => RuleActions
  488. },
  489. Path = emqx_mgmt_api_test_util:api_path(["rules"]),
  490. ct:pal("create rule:\n ~p", [Params]),
  491. Method = post,
  492. Res = request(Method, Path, Params),
  493. ct:pal("create rule results:\n ~p", [Res]),
  494. Res.
  495. create_rule_and_action_http(BridgeType, RuleTopic, Config) ->
  496. create_rule_and_action_http(BridgeType, RuleTopic, Config, _Opts = #{}).
  497. create_rule_and_action_http(BridgeType, RuleTopic, Config, Opts) ->
  498. BridgeName = ?config(bridge_name, Config),
  499. BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName),
  500. SQL = maps:get(sql, Opts, <<"SELECT * FROM \"", RuleTopic/binary, "\"">>),
  501. Params0 = #{
  502. enable => true,
  503. sql => SQL,
  504. actions => [BridgeId]
  505. },
  506. Overrides = maps:get(overrides, Opts, #{}),
  507. Params = emqx_utils_maps:deep_merge(Params0, Overrides),
  508. Path = emqx_mgmt_api_test_util:api_path(["rules"]),
  509. AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
  510. ct:pal("rule action params: ~p", [Params]),
  511. case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of
  512. {ok, Res0} ->
  513. Res = #{<<"id">> := RuleId} = emqx_utils_json:decode(Res0, [return_maps]),
  514. on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
  515. {ok, Res};
  516. Error ->
  517. Error
  518. end.
  519. api_spec_schemas(Root) ->
  520. Method = get,
  521. Path = emqx_mgmt_api_test_util:api_path(["schemas", Root]),
  522. Params = [],
  523. AuthHeader = [],
  524. Opts = #{return_all => true},
  525. case emqx_mgmt_api_test_util:request_api(Method, Path, "", AuthHeader, Params, Opts) of
  526. {ok, {{_, 200, _}, _, Res0}} ->
  527. #{<<"components">> := #{<<"schemas">> := Schemas}} =
  528. emqx_utils_json:decode(Res0, [return_maps]),
  529. Schemas
  530. end.
  531. bridges_api_spec_schemas() ->
  532. api_spec_schemas("bridges").
  533. actions_api_spec_schemas() ->
  534. api_spec_schemas("actions").
  535. get_value(Key, Config) ->
  536. case proplists:get_value(Key, Config, undefined) of
  537. undefined ->
  538. error({missing_required_config, Key, Config});
  539. Value ->
  540. Value
  541. end.
  542. get_common_values(Config) ->
  543. Kind = proplists:get_value(bridge_kind, Config, action),
  544. case Kind of
  545. action ->
  546. #{
  547. conf_root_key => actions,
  548. kind => Kind,
  549. type => get_ct_config_with_fallback(Config, [action_type, bridge_type]),
  550. name => get_ct_config_with_fallback(Config, [action_name, bridge_name]),
  551. connector_type => get_value(connector_type, Config),
  552. connector_name => get_value(connector_name, Config)
  553. };
  554. source ->
  555. #{
  556. conf_root_key => sources,
  557. kind => Kind,
  558. type => get_value(source_type, Config),
  559. name => get_value(source_name, Config),
  560. connector_type => get_value(connector_type, Config),
  561. connector_name => get_value(connector_name, Config)
  562. }
  563. end.
  564. connector_resource_id(Config) ->
  565. #{connector_type := Type, connector_name := Name} = get_common_values(Config),
  566. emqx_connector_resource:resource_id(Type, Name).
  567. health_check_channel(Config) ->
  568. ConnectorResId = connector_resource_id(Config),
  569. ChannelResId = resource_id(Config),
  570. emqx_resource_manager:channel_health_check(ConnectorResId, ChannelResId).
  571. %%------------------------------------------------------------------------------
  572. %% Internal export
  573. %%------------------------------------------------------------------------------
  574. source_hookpoint_callback(Message, TestPid) ->
  575. TestPid ! {consumed_message, Message},
  576. ok.
  577. %%------------------------------------------------------------------------------
  578. %% Testcases
  579. %%------------------------------------------------------------------------------
  580. t_sync_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint) ->
  581. ?check_trace(
  582. begin
  583. ?assertMatch({ok, _}, create_bridge_api(Config)),
  584. ResourceId = resource_id(Config),
  585. ?retry(
  586. _Sleep = 1_000,
  587. _Attempts = 20,
  588. ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
  589. ),
  590. BridgeId = bridge_id(Config),
  591. Message = {BridgeId, MakeMessageFun()},
  592. IsSuccessCheck(emqx_resource:simple_sync_query(ResourceId, Message)),
  593. ok
  594. end,
  595. fun(Trace) ->
  596. ResourceId = resource_id(Config),
  597. ?assertMatch([#{instance_id := ResourceId}], ?of_kind(TracePoint, Trace))
  598. end
  599. ),
  600. ok.
  601. t_async_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint) ->
  602. ReplyFun =
  603. fun(Pid, Result) ->
  604. Pid ! {result, Result}
  605. end,
  606. ?check_trace(
  607. begin
  608. ?assertMatch({ok, _}, create_bridge_api(Config)),
  609. ResourceId = resource_id(Config),
  610. ?retry(
  611. _Sleep = 1_000,
  612. _Attempts = 20,
  613. ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
  614. ),
  615. BridgeId = bridge_id(Config),
  616. BridgeType = ?config(bridge_type, Config),
  617. BridgeName = ?config(bridge_name, Config),
  618. Message = {BridgeId, MakeMessageFun()},
  619. ?assertMatch(
  620. {ok, {ok, _}},
  621. ?wait_async_action(
  622. emqx_bridge_v2:query(BridgeType, BridgeName, Message, #{
  623. async_reply_fun => {ReplyFun, [self()]}
  624. }),
  625. #{?snk_kind := TracePoint, instance_id := ResourceId},
  626. 5_000
  627. )
  628. ),
  629. ok
  630. end,
  631. fun(Trace) ->
  632. ResourceId = resource_id(Config),
  633. ?assertMatch([#{instance_id := ResourceId}], ?of_kind(TracePoint, Trace))
  634. end
  635. ),
  636. receive
  637. {result, Result} -> IsSuccessCheck(Result)
  638. after 8_000 ->
  639. throw(timeout)
  640. end,
  641. ok.
  642. %% - `ProduceFn': produces a message in the remote system that shall be consumed. May be
  643. %% a `{function(), integer()}' tuple.
  644. %% - `Tracepoint': marks the end of consumed message processing.
  645. t_consume(Config, Opts) ->
  646. #{
  647. consumer_ready_tracepoint := ConsumerReadyTPFn,
  648. produce_fn := ProduceFn,
  649. check_fn := CheckFn,
  650. produce_tracepoint := TracePointFn
  651. } = Opts,
  652. ?check_trace(
  653. begin
  654. ConsumerReadyTimeout = maps:get(consumer_ready_timeout, Opts, 15_000),
  655. case ConsumerReadyTPFn of
  656. {Predicate, NEvents} when is_function(Predicate) ->
  657. {ok, SRef0} = snabbkaffe:subscribe(Predicate, NEvents, ConsumerReadyTimeout);
  658. Predicate when is_function(Predicate) ->
  659. {ok, SRef0} = snabbkaffe:subscribe(
  660. Predicate, _NEvents = 1, ConsumerReadyTimeout
  661. )
  662. end,
  663. ?assertMatch({ok, _}, create_bridge_api(Config)),
  664. ?assertMatch({ok, _}, snabbkaffe:receive_events(SRef0)),
  665. ok = add_source_hookpoint(Config),
  666. ?retry(
  667. _Sleep = 200,
  668. _Attempts = 20,
  669. ?assertMatch(
  670. #{status := ?status_connected},
  671. health_check_channel(Config)
  672. )
  673. ),
  674. ?assertMatch(
  675. {_, {ok, _}},
  676. snabbkaffe:wait_async_action(
  677. ProduceFn,
  678. TracePointFn,
  679. 15_000
  680. )
  681. ),
  682. receive
  683. {consumed_message, Message} ->
  684. CheckFn(Message)
  685. after 5_000 ->
  686. error({timeout, process_info(self(), messages)})
  687. end,
  688. ok
  689. end,
  690. []
  691. ),
  692. ok.
  693. t_create_via_http(Config) ->
  694. ?check_trace(
  695. begin
  696. ?assertMatch({ok, _}, create_bridge_api(Config)),
  697. ?assertMatch(
  698. {ok, _},
  699. update_bridge_api(
  700. Config
  701. )
  702. ),
  703. %% check that v1 list API is fine
  704. ?assertMatch(
  705. {ok, {{_, 200, _}, _, _}},
  706. list_bridges_http_api_v1()
  707. ),
  708. ok
  709. end,
  710. []
  711. ),
  712. ok.
  713. t_start_stop(Config, StopTracePoint) ->
  714. Kind = proplists:get_value(bridge_kind, Config, action),
  715. ConnectorName = ?config(connector_name, Config),
  716. ConnectorType = ?config(connector_type, Config),
  717. #{
  718. type := Type,
  719. name := Name,
  720. config := BridgeConfig
  721. } = get_config_by_kind(Kind, Config, _Overrides = #{}),
  722. ?assertMatch(
  723. {ok, {{_, 201, _}, _, _}},
  724. create_connector_api(Config)
  725. ),
  726. ?check_trace(
  727. begin
  728. ?assertMatch(
  729. {ok, {{_, 204, _}, _Headers, _Body}},
  730. probe_bridge_api(
  731. Kind,
  732. Type,
  733. Name,
  734. BridgeConfig
  735. )
  736. ),
  737. %% Check that the bridge probe API doesn't leak atoms.
  738. AtomsBefore = erlang:system_info(atom_count),
  739. %% Probe again; shouldn't have created more atoms.
  740. ProbeRes1 = probe_bridge_api(
  741. Kind,
  742. Type,
  743. Name,
  744. BridgeConfig
  745. ),
  746. ?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes1),
  747. AtomsAfter = erlang:system_info(atom_count),
  748. ?assertEqual(AtomsBefore, AtomsAfter),
  749. ?assertMatch({ok, _}, create_kind_api(Config)),
  750. ResourceId = emqx_bridge_resource:resource_id(conf_root_key(Kind), Type, Name),
  751. %% Since the connection process is async, we give it some time to
  752. %% stabilize and avoid flakiness.
  753. ?retry(
  754. _Sleep = 1_000,
  755. _Attempts = 20,
  756. ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
  757. ),
  758. %% `start` bridge to trigger `already_started`
  759. ?assertMatch(
  760. {ok, {{_, 204, _}, _Headers, []}},
  761. op_bridge_api(Kind, "start", Type, Name)
  762. ),
  763. ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)),
  764. %% Not supported anymore
  765. %% ?assertMatch(
  766. %% {{ok, _}, {ok, _}},
  767. %% ?wait_async_action(
  768. %% emqx_bridge_v2_testlib:op_bridge_api("stop", BridgeType, BridgeName),
  769. %% #{?snk_kind := StopTracePoint},
  770. %% 5_000
  771. %% )
  772. %% ),
  773. %% ?assertEqual(
  774. %% {error, resource_is_stopped}, emqx_resource_manager:health_check(ResourceId)
  775. %% ),
  776. %% ?assertMatch(
  777. %% {ok, {{_, 204, _}, _Headers, []}},
  778. %% emqx_bridge_v2_testlib:op_bridge_api("stop", BridgeType, BridgeName)
  779. %% ),
  780. %% ?assertEqual(
  781. %% {error, resource_is_stopped}, emqx_resource_manager:health_check(ResourceId)
  782. %% ),
  783. %% ?assertMatch(
  784. %% {ok, {{_, 204, _}, _Headers, []}},
  785. %% emqx_bridge_v2_testlib:op_bridge_api("start", BridgeType, BridgeName)
  786. %% ),
  787. ?retry(
  788. _Sleep = 1_000,
  789. _Attempts = 20,
  790. ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
  791. ),
  792. %% Disable the connector, which will also stop it.
  793. ?assertMatch(
  794. {{ok, _}, {ok, _}},
  795. ?wait_async_action(
  796. emqx_connector:disable_enable(disable, ConnectorType, ConnectorName),
  797. #{?snk_kind := StopTracePoint},
  798. 5_000
  799. )
  800. ),
  801. #{resource_id => ResourceId}
  802. end,
  803. fun(Res, Trace) ->
  804. #{resource_id := ResourceId} = Res,
  805. %% one for each probe, one for real
  806. ?assertMatch(
  807. [_, _, #{instance_id := ResourceId}],
  808. ?of_kind(StopTracePoint, Trace)
  809. ),
  810. ok
  811. end
  812. ),
  813. ok.
  814. t_on_get_status(Config) ->
  815. t_on_get_status(Config, _Opts = #{}).
  816. t_on_get_status(Config, Opts) ->
  817. ProxyPort = ?config(proxy_port, Config),
  818. ProxyHost = ?config(proxy_host, Config),
  819. ProxyName = ?config(proxy_name, Config),
  820. FailureStatus = maps:get(failure_status, Opts, disconnected),
  821. ?assertMatch({ok, _}, create_bridge(Config)),
  822. ResourceId = resource_id(Config),
  823. %% Since the connection process is async, we give it some time to
  824. %% stabilize and avoid flakiness.
  825. ?retry(
  826. _Sleep = 1_000,
  827. _Attempts = 20,
  828. ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
  829. ),
  830. case ProxyHost of
  831. undefined ->
  832. ok;
  833. _ ->
  834. emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
  835. ?retry(
  836. _Interval0 = 100,
  837. _Attempts0 = 20,
  838. ?assertEqual(
  839. {ok, FailureStatus}, emqx_resource_manager:health_check(ResourceId)
  840. )
  841. )
  842. end),
  843. %% Check that it recovers itself.
  844. ?retry(
  845. _Sleep = 1_000,
  846. _Attempts = 20,
  847. ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
  848. )
  849. end,
  850. ok.