emqx_bridge_testlib.erl 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%--------------------------------------------------------------------
  4. -module(emqx_bridge_testlib).
  5. -compile(nowarn_export_all).
  6. -compile(export_all).
  7. -include_lib("eunit/include/eunit.hrl").
  8. -include_lib("common_test/include/ct.hrl").
  9. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  10. -import(emqx_common_test_helpers, [on_exit/1]).
  11. %% ct setup helpers
  12. init_per_suite(Config, Apps) ->
  13. [{start_apps, Apps} | Config].
  14. end_per_suite(Config) ->
  15. emqx_mgmt_api_test_util:end_suite(),
  16. ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
  17. ok = emqx_connector_test_helpers:stop_apps(lists:reverse(?config(start_apps, Config))),
  18. _ = application:stop(emqx_connector),
  19. ok.
  20. init_per_group(TestGroup, BridgeType, Config) ->
  21. ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
  22. ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
  23. emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
  24. application:load(emqx_bridge),
  25. ok = emqx_common_test_helpers:start_apps([emqx_conf]),
  26. ok = emqx_connector_test_helpers:start_apps(?config(start_apps, Config)),
  27. {ok, _} = application:ensure_all_started(emqx_connector),
  28. emqx_mgmt_api_test_util:init_suite(),
  29. UniqueNum = integer_to_binary(erlang:unique_integer([positive])),
  30. MQTTTopic = <<"mqtt/topic/abc", UniqueNum/binary>>,
  31. [
  32. {proxy_host, ProxyHost},
  33. {proxy_port, ProxyPort},
  34. {mqtt_topic, MQTTTopic},
  35. {test_group, TestGroup},
  36. {bridge_type, BridgeType}
  37. | Config
  38. ].
  39. end_per_group(Config) ->
  40. ProxyHost = ?config(proxy_host, Config),
  41. ProxyPort = ?config(proxy_port, Config),
  42. emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
  43. delete_all_bridges(),
  44. ok.
  45. init_per_testcase(TestCase, Config0, BridgeConfigCb) ->
  46. ct:timetrap(timer:seconds(60)),
  47. delete_all_bridges(),
  48. UniqueNum = integer_to_binary(erlang:unique_integer()),
  49. BridgeTopic =
  50. <<
  51. (atom_to_binary(TestCase))/binary,
  52. UniqueNum/binary
  53. >>,
  54. TestGroup = ?config(test_group, Config0),
  55. Config = [{bridge_topic, BridgeTopic} | Config0],
  56. {Name, ConfigString, BridgeConfig} = BridgeConfigCb(
  57. TestCase, TestGroup, Config
  58. ),
  59. ok = snabbkaffe:start_trace(),
  60. [
  61. {bridge_name, Name},
  62. {bridge_config_string, ConfigString},
  63. {bridge_config, BridgeConfig}
  64. | Config
  65. ].
  66. end_per_testcase(_Testcase, Config) ->
  67. case proplists:get_bool(skip_does_not_apply, Config) of
  68. true ->
  69. ok;
  70. false ->
  71. ProxyHost = ?config(proxy_host, Config),
  72. ProxyPort = ?config(proxy_port, Config),
  73. emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
  74. delete_all_bridges(),
  75. %% in CI, apparently this needs more time since the
  76. %% machines struggle with all the containers running...
  77. emqx_common_test_helpers:call_janitor(60_000),
  78. ok = snabbkaffe:stop(),
  79. ok
  80. end.
  81. delete_all_bridges() ->
  82. lists:foreach(
  83. fun(#{name := Name, type := Type}) ->
  84. ok = emqx_bridge:remove(Type, Name)
  85. end,
  86. emqx_bridge:list()
  87. ).
  88. %% test helpers
  89. parse_and_check(BridgeType, BridgeName, ConfigString) ->
  90. {ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
  91. hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
  92. #{<<"bridges">> := #{BridgeType := #{BridgeName := BridgeConfig}}} = RawConf,
  93. BridgeConfig.
  94. resource_id(Config) ->
  95. BridgeKind = proplists:get_value(bridge_kind, Config, action),
  96. ConfRootKey =
  97. case BridgeKind of
  98. action -> actions;
  99. source -> sources
  100. end,
  101. BridgeType = ?config(bridge_type, Config),
  102. BridgeName = ?config(bridge_name, Config),
  103. emqx_bridge_resource:resource_id(ConfRootKey, BridgeType, BridgeName).
  104. create_bridge(Config) ->
  105. create_bridge(Config, _Overrides = #{}).
  106. create_bridge(Config, Overrides) ->
  107. BridgeType = ?config(bridge_type, Config),
  108. BridgeName = ?config(bridge_name, Config),
  109. BridgeConfig0 = ?config(bridge_config, Config),
  110. BridgeConfig = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides),
  111. ct:pal("creating bridge with config: ~p", [BridgeConfig]),
  112. emqx_bridge:create(BridgeType, BridgeName, BridgeConfig).
  113. list_bridges_api() ->
  114. Params = [],
  115. Path = emqx_mgmt_api_test_util:api_path(["bridges"]),
  116. AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
  117. Opts = #{return_all => true},
  118. ct:pal("listing bridges (via http)"),
  119. Res =
  120. case emqx_mgmt_api_test_util:request_api(get, Path, "", AuthHeader, Params, Opts) of
  121. {ok, {Status, Headers, Body0}} ->
  122. {ok, {Status, Headers, emqx_utils_json:decode(Body0, [return_maps])}};
  123. Error ->
  124. Error
  125. end,
  126. ct:pal("list bridge result: ~p", [Res]),
  127. Res.
  128. create_bridge_api(Config) ->
  129. create_bridge_api(Config, _Overrides = #{}).
  130. create_bridge_api(Config, Overrides) ->
  131. BridgeType = ?config(bridge_type, Config),
  132. BridgeName = ?config(bridge_name, Config),
  133. BridgeConfig0 = ?config(bridge_config, Config),
  134. BridgeConfig = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides),
  135. create_bridge_api(BridgeType, BridgeName, BridgeConfig).
  136. create_bridge_api(BridgeType, BridgeName, BridgeConfig) ->
  137. Params = BridgeConfig#{<<"type">> => BridgeType, <<"name">> => BridgeName},
  138. Path = emqx_mgmt_api_test_util:api_path(["bridges"]),
  139. AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
  140. Opts = #{return_all => true},
  141. ct:pal("creating bridge (via http): ~p", [Params]),
  142. Res =
  143. case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of
  144. {ok, {Status, Headers, Body0}} ->
  145. {ok, {Status, Headers, emqx_utils_json:decode(Body0, [return_maps])}};
  146. Error ->
  147. Error
  148. end,
  149. ct:pal("bridge create result: ~p", [Res]),
  150. Res.
  151. update_bridge_api(Config) ->
  152. update_bridge_api(Config, _Overrides = #{}).
  153. update_bridge_api(Config, Overrides) ->
  154. BridgeType = ?config(bridge_type, Config),
  155. Name = ?config(bridge_name, Config),
  156. BridgeConfig0 = ?config(bridge_config, Config),
  157. BridgeConfig = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides),
  158. BridgeId = emqx_bridge_resource:bridge_id(BridgeType, Name),
  159. Params = BridgeConfig#{<<"type">> => BridgeType, <<"name">> => Name},
  160. Path = emqx_mgmt_api_test_util:api_path(["bridges", BridgeId]),
  161. AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
  162. Opts = #{return_all => true},
  163. ct:pal("updating bridge (via http): ~p", [Params]),
  164. Res =
  165. case emqx_mgmt_api_test_util:request_api(put, Path, "", AuthHeader, Params, Opts) of
  166. {ok, {_Status, _Headers, Body0}} -> {ok, emqx_utils_json:decode(Body0, [return_maps])};
  167. Error -> Error
  168. end,
  169. ct:pal("bridge update result: ~p", [Res]),
  170. Res.
  171. delete_bridge_http_api_v1(Opts) ->
  172. #{type := Type, name := Name} = Opts,
  173. BridgeId = emqx_bridge_resource:bridge_id(Type, Name),
  174. Path = emqx_mgmt_api_test_util:api_path(["bridges", BridgeId]),
  175. ct:pal("deleting bridge (http v1)"),
  176. Res = emqx_bridge_v2_testlib:request(delete, Path, _Params = []),
  177. ct:pal("bridge delete (http v1) result:\n ~p", [Res]),
  178. Res.
  179. op_bridge_api(Op, BridgeType, BridgeName) ->
  180. BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName),
  181. Path = emqx_mgmt_api_test_util:api_path(["bridges", BridgeId, Op]),
  182. ct:pal("calling bridge ~p (via http): ~p", [BridgeId, Op]),
  183. Method = post,
  184. Params = [],
  185. Res = emqx_bridge_v2_testlib:request(Method, Path, Params),
  186. ct:pal("bridge op result: ~p", [Res]),
  187. Res.
  188. probe_bridge_api(Config) ->
  189. probe_bridge_api(Config, _Overrides = #{}).
  190. probe_bridge_api(Config, Overrides) ->
  191. BridgeType = ?config(bridge_type, Config),
  192. BridgeName = ?config(bridge_name, Config),
  193. BridgeConfig0 = ?config(bridge_config, Config),
  194. BridgeConfig = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides),
  195. probe_bridge_api(BridgeType, BridgeName, BridgeConfig).
  196. probe_bridge_api(BridgeType, BridgeName, BridgeConfig) ->
  197. Params = BridgeConfig#{<<"type">> => BridgeType, <<"name">> => BridgeName},
  198. Path = emqx_mgmt_api_test_util:api_path(["bridges_probe"]),
  199. AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
  200. Opts = #{return_all => true},
  201. ct:pal("probing bridge (via http): ~p", [Params]),
  202. Res =
  203. case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of
  204. {ok, {{_, 204, _}, _Headers, _Body0} = Res0} -> {ok, Res0};
  205. Error -> Error
  206. end,
  207. ct:pal("bridge probe result: ~p", [Res]),
  208. Res.
  209. try_decode_error(Body0) ->
  210. case emqx_utils_json:safe_decode(Body0, [return_maps]) of
  211. {ok, #{<<"message">> := Msg0} = Body1} ->
  212. case emqx_utils_json:safe_decode(Msg0, [return_maps]) of
  213. {ok, Msg1} -> Body1#{<<"message">> := Msg1};
  214. {error, _} -> Body1
  215. end;
  216. {ok, Body1} ->
  217. Body1;
  218. {error, _} ->
  219. Body0
  220. end.
  221. create_rule_and_action_http(BridgeType, RuleTopic, Config) ->
  222. create_rule_and_action_http(BridgeType, RuleTopic, Config, _Opts = #{}).
  223. create_rule_and_action_http(BridgeType, RuleTopic, Config, Opts) ->
  224. BridgeName = ?config(bridge_name, Config),
  225. BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName),
  226. create_rule_and_action(BridgeId, RuleTopic, Opts).
  227. create_rule_and_action(Action, RuleTopic, Opts) ->
  228. SQL = maps:get(sql, Opts, <<"SELECT * FROM \"", RuleTopic/binary, "\"">>),
  229. Params = #{
  230. enable => true,
  231. sql => SQL,
  232. actions => [Action]
  233. },
  234. Path = emqx_mgmt_api_test_util:api_path(["rules"]),
  235. AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
  236. ct:pal("rule action params: ~p", [Params]),
  237. case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of
  238. {ok, Res0} ->
  239. Res = #{<<"id">> := RuleId} = emqx_utils_json:decode(Res0, [return_maps]),
  240. on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
  241. {ok, Res};
  242. Error ->
  243. Error
  244. end.
  245. make_message(Config, MakeMessageFun) ->
  246. BridgeType = ?config(bridge_type, Config),
  247. case emqx_bridge_v2:is_bridge_v2_type(BridgeType) of
  248. true ->
  249. BridgeId = emqx_bridge_v2_testlib:bridge_id(Config),
  250. {BridgeId, MakeMessageFun()};
  251. false ->
  252. {send_message, MakeMessageFun()}
  253. end.
  254. %%------------------------------------------------------------------------------
  255. %% Testcases
  256. %%------------------------------------------------------------------------------
  257. t_sync_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint) ->
  258. ?check_trace(
  259. begin
  260. ?assertMatch({ok, _}, create_bridge_api(Config)),
  261. ResourceId = resource_id(Config),
  262. ?retry(
  263. _Sleep = 1_000,
  264. _Attempts = 20,
  265. ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
  266. ),
  267. Message = make_message(Config, MakeMessageFun),
  268. IsSuccessCheck(emqx_resource:simple_sync_query(ResourceId, Message)),
  269. ok
  270. end,
  271. fun(Trace) ->
  272. ResourceId = resource_id(Config),
  273. ?assertMatch([#{instance_id := ResourceId}], ?of_kind(TracePoint, Trace))
  274. end
  275. ),
  276. ok.
  277. t_async_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint) ->
  278. ReplyFun =
  279. fun(Pid, Result) ->
  280. Pid ! {result, Result}
  281. end,
  282. ?check_trace(
  283. begin
  284. ?assertMatch({ok, _}, create_bridge_api(Config)),
  285. ResourceId = resource_id(Config),
  286. ?retry(
  287. _Sleep = 1_000,
  288. _Attempts = 20,
  289. ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
  290. ),
  291. Message = make_message(Config, MakeMessageFun),
  292. ?assertMatch(
  293. {ok, {ok, _}},
  294. ?wait_async_action(
  295. emqx_resource:query(ResourceId, Message, #{
  296. async_reply_fun => {ReplyFun, [self()]}
  297. }),
  298. #{?snk_kind := TracePoint, instance_id := ResourceId},
  299. 5_000
  300. )
  301. ),
  302. ok
  303. end,
  304. fun(Trace) ->
  305. ResourceId = resource_id(Config),
  306. ?assertMatch([#{instance_id := ResourceId}], ?of_kind(TracePoint, Trace))
  307. end
  308. ),
  309. receive
  310. {result, Result} -> IsSuccessCheck(Result)
  311. after 5_000 ->
  312. throw(timeout)
  313. end,
  314. ok.
  315. t_create_via_http(Config) ->
  316. ?check_trace(
  317. begin
  318. ?assertMatch({ok, _}, create_bridge_api(Config)),
  319. %% lightweight matrix testing some configs
  320. ?assertMatch(
  321. {ok, _},
  322. update_bridge_api(
  323. Config
  324. )
  325. ),
  326. ?assertMatch(
  327. {ok, _},
  328. update_bridge_api(
  329. Config
  330. )
  331. ),
  332. ok
  333. end,
  334. []
  335. ),
  336. ok.
  337. t_start_stop(Config, StopTracePoint) ->
  338. BridgeType = ?config(bridge_type, Config),
  339. BridgeName = ?config(bridge_name, Config),
  340. BridgeConfig = ?config(bridge_config, Config),
  341. t_start_stop(BridgeType, BridgeName, BridgeConfig, StopTracePoint).
  342. t_start_stop(BridgeType, BridgeName, BridgeConfig, StopTracePoint) ->
  343. ?check_trace(
  344. begin
  345. %% Check that the bridge probe API doesn't leak atoms.
  346. ProbeRes0 = probe_bridge_api(
  347. BridgeType,
  348. BridgeName,
  349. BridgeConfig
  350. ),
  351. ?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes0),
  352. AtomsBefore = erlang:system_info(atom_count),
  353. %% Probe again; shouldn't have created more atoms.
  354. ProbeRes1 = probe_bridge_api(
  355. BridgeType,
  356. BridgeName,
  357. BridgeConfig
  358. ),
  359. ?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes1),
  360. AtomsAfter = erlang:system_info(atom_count),
  361. ?assertEqual(AtomsBefore, AtomsAfter),
  362. ?assertMatch({ok, _}, emqx_bridge:create(BridgeType, BridgeName, BridgeConfig)),
  363. ResourceId = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
  364. %% Since the connection process is async, we give it some time to
  365. %% stabilize and avoid flakiness.
  366. ?retry(
  367. _Sleep = 1_000,
  368. _Attempts = 20,
  369. ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
  370. ),
  371. %% `start` bridge to trigger `already_started`
  372. ?assertMatch(
  373. {ok, {{_, 204, _}, _Headers, []}},
  374. emqx_bridge_testlib:op_bridge_api("start", BridgeType, BridgeName)
  375. ),
  376. ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)),
  377. ?assertMatch(
  378. {{ok, _}, {ok, _}},
  379. ?wait_async_action(
  380. emqx_bridge_testlib:op_bridge_api("stop", BridgeType, BridgeName),
  381. #{?snk_kind := StopTracePoint},
  382. 5_000
  383. )
  384. ),
  385. ?assertEqual(
  386. {error, resource_is_stopped}, emqx_resource_manager:health_check(ResourceId)
  387. ),
  388. ?assertMatch(
  389. {ok, {{_, 204, _}, _Headers, []}},
  390. emqx_bridge_testlib:op_bridge_api("stop", BridgeType, BridgeName)
  391. ),
  392. ?assertEqual(
  393. {error, resource_is_stopped}, emqx_resource_manager:health_check(ResourceId)
  394. ),
  395. ?assertMatch(
  396. {ok, {{_, 204, _}, _Headers, []}},
  397. emqx_bridge_testlib:op_bridge_api("start", BridgeType, BridgeName)
  398. ),
  399. ?retry(
  400. _Sleep = 1_000,
  401. _Attempts = 20,
  402. ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
  403. ),
  404. %% Disable the bridge, which will also stop it.
  405. ?assertMatch(
  406. {{ok, _}, {ok, _}},
  407. ?wait_async_action(
  408. emqx_bridge:disable_enable(disable, BridgeType, BridgeName),
  409. #{?snk_kind := StopTracePoint},
  410. 5_000
  411. )
  412. ),
  413. ok
  414. end,
  415. fun(Trace) ->
  416. ResourceId = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
  417. %% one for each probe, two for real
  418. ?assertMatch(
  419. [_, _, #{instance_id := ResourceId}, #{instance_id := ResourceId}],
  420. ?of_kind(StopTracePoint, Trace)
  421. ),
  422. ok
  423. end
  424. ),
  425. ok.
  426. t_on_get_status(Config) ->
  427. t_on_get_status(Config, _Opts = #{}).
  428. t_on_get_status(Config, Opts) ->
  429. ProxyPort = ?config(proxy_port, Config),
  430. ProxyHost = ?config(proxy_host, Config),
  431. ProxyName = ?config(proxy_name, Config),
  432. FailureStatus = maps:get(failure_status, Opts, disconnected),
  433. ?assertMatch({ok, _}, create_bridge(Config)),
  434. ResourceId = resource_id(Config),
  435. %% Since the connection process is async, we give it some time to
  436. %% stabilize and avoid flakiness.
  437. ?retry(
  438. _Sleep = 1_000,
  439. _Attempts = 20,
  440. ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
  441. ),
  442. emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
  443. ct:sleep(500),
  444. ?retry(
  445. _Interval0 = 200,
  446. _Attempts0 = 10,
  447. ?assertEqual({ok, FailureStatus}, emqx_resource_manager:health_check(ResourceId))
  448. )
  449. end),
  450. %% Check that it recovers itself.
  451. ?retry(
  452. _Sleep = 1_000,
  453. _Attempts = 20,
  454. ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
  455. ),
  456. ok.