emqx_common_test_helpers.erl 43 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2019-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_common_test_helpers).
  17. -include_lib("emqx/include/emqx_authentication.hrl").
  18. -type special_config_handler() :: fun().
  19. -type apps() :: list(atom()).
  20. -export([
  21. all/1,
  22. init_per_testcase/3,
  23. end_per_testcase/3,
  24. boot_modules/1,
  25. start_apps/1,
  26. start_apps/2,
  27. start_apps/3,
  28. start_app/2,
  29. stop_apps/1,
  30. stop_apps/2,
  31. reload/2,
  32. app_path/2,
  33. proj_root/0,
  34. deps_path/2,
  35. flush/0,
  36. flush/1,
  37. load/1,
  38. render_and_load_app_config/1,
  39. render_and_load_app_config/2
  40. ]).
  41. -export([
  42. client_ssl/0,
  43. client_ssl/1,
  44. client_ssl_twoway/0,
  45. client_ssl_twoway/1,
  46. ensure_mnesia_stopped/0,
  47. ensure_quic_listener/2,
  48. ensure_quic_listener/3,
  49. is_all_tcp_servers_available/1,
  50. is_tcp_server_available/2,
  51. is_tcp_server_available/3,
  52. load_config/2,
  53. not_wait_mqtt_payload/1,
  54. read_schema_configs/2,
  55. render_config_file/2,
  56. wait_for/4,
  57. wait_mqtt_payload/1,
  58. select_free_port/1
  59. ]).
  60. -export([
  61. emqx_cluster/1,
  62. emqx_cluster/2,
  63. start_ekka/0,
  64. start_epmd/0,
  65. start_slave/2,
  66. stop_slave/1,
  67. listener_port/2
  68. ]).
  69. -export([clear_screen/0]).
  70. -export([with_mock/4]).
  71. -export([
  72. on_exit/1,
  73. call_janitor/0,
  74. call_janitor/1
  75. ]).
  76. %% Toxiproxy API
  77. -export([
  78. with_failure/5,
  79. enable_failure/4,
  80. heal_failure/4,
  81. reset_proxy/2
  82. ]).
  83. %% TLS certs API
  84. -export([
  85. gen_ca/2,
  86. gen_host_cert/3,
  87. gen_host_cert/4
  88. ]).
  89. -define(CERTS_PATH(CertName), filename:join(["etc", "certs", CertName])).
  90. -define(MQTT_SSL_CLIENT_CERTS, [
  91. {keyfile, ?CERTS_PATH("client-key.pem")},
  92. {cacertfile, ?CERTS_PATH("cacert.pem")},
  93. {certfile, ?CERTS_PATH("client-cert.pem")}
  94. ]).
  95. -define(TLS_1_3_CIPHERS, [
  96. {versions, ['tlsv1.3']},
  97. {ciphers, [
  98. "TLS_AES_256_GCM_SHA384",
  99. "TLS_AES_128_GCM_SHA256",
  100. "TLS_CHACHA20_POLY1305_SHA256",
  101. "TLS_AES_128_CCM_SHA256",
  102. "TLS_AES_128_CCM_8_SHA256"
  103. ]}
  104. ]).
  105. -define(TLS_OLD_CIPHERS, [
  106. {versions, ['tlsv1.1', 'tlsv1.2']},
  107. {ciphers, [
  108. "ECDHE-ECDSA-AES256-GCM-SHA384",
  109. "ECDHE-RSA-AES256-GCM-SHA384",
  110. "ECDHE-ECDSA-AES256-SHA384",
  111. "ECDHE-RSA-AES256-SHA384",
  112. "ECDHE-ECDSA-DES-CBC3-SHA",
  113. "ECDH-ECDSA-AES256-GCM-SHA384",
  114. "ECDH-RSA-AES256-GCM-SHA384",
  115. "ECDH-ECDSA-AES256-SHA384",
  116. "ECDH-RSA-AES256-SHA384",
  117. "DHE-DSS-AES256-GCM-SHA384",
  118. "DHE-DSS-AES256-SHA256",
  119. "AES256-GCM-SHA384",
  120. "AES256-SHA256",
  121. "ECDHE-ECDSA-AES128-GCM-SHA256",
  122. "ECDHE-RSA-AES128-GCM-SHA256",
  123. "ECDHE-ECDSA-AES128-SHA256",
  124. "ECDHE-RSA-AES128-SHA256",
  125. "ECDH-ECDSA-AES128-GCM-SHA256",
  126. "ECDH-RSA-AES128-GCM-SHA256",
  127. "ECDH-ECDSA-AES128-SHA256",
  128. "ECDH-RSA-AES128-SHA256",
  129. "DHE-DSS-AES128-GCM-SHA256",
  130. "DHE-DSS-AES128-SHA256",
  131. "AES128-GCM-SHA256",
  132. "AES128-SHA256",
  133. "ECDHE-ECDSA-AES256-SHA",
  134. "ECDHE-RSA-AES256-SHA",
  135. "DHE-DSS-AES256-SHA",
  136. "ECDH-ECDSA-AES256-SHA",
  137. "ECDH-RSA-AES256-SHA",
  138. "AES256-SHA",
  139. "ECDHE-ECDSA-AES128-SHA",
  140. "ECDHE-RSA-AES128-SHA",
  141. "DHE-DSS-AES128-SHA",
  142. "ECDH-ECDSA-AES128-SHA",
  143. "ECDH-RSA-AES128-SHA",
  144. "AES128-SHA"
  145. ]}
  146. ]).
  147. -define(DEFAULT_TCP_SERVER_CHECK_AVAIL_TIMEOUT, 1000).
  148. %%------------------------------------------------------------------------------
  149. %% APIs
  150. %%------------------------------------------------------------------------------
  151. all(Suite) ->
  152. lists:usort([
  153. F
  154. || {F, 1} <- Suite:module_info(exports),
  155. string:substr(atom_to_list(F), 1, 2) == "t_"
  156. ]).
  157. init_per_testcase(Module, TestCase, Config) ->
  158. case erlang:function_exported(Module, TestCase, 2) of
  159. true -> Module:TestCase(init, Config);
  160. false -> Config
  161. end.
  162. end_per_testcase(Module, TestCase, Config) ->
  163. case erlang:function_exported(Module, TestCase, 2) of
  164. true -> Module:TestCase('end', Config);
  165. false -> ok
  166. end,
  167. Config.
  168. %% set emqx app boot modules
  169. -spec boot_modules(all | list(atom())) -> ok.
  170. boot_modules(Mods) ->
  171. application:set_env(emqx, boot_modules, Mods).
  172. -spec start_apps(Apps :: apps()) -> ok.
  173. start_apps(Apps) ->
  174. %% to avoid keeping the `db_hostname' that is set when loading
  175. %% `system_monitor' application in `emqx_machine', and then it
  176. %% crashing when trying to connect.
  177. %% FIXME: add an `enable' option to sysmon_top and use that to
  178. %% decide whether to start it or not.
  179. DefaultHandler =
  180. fun(_) ->
  181. application:set_env(system_monitor, db_hostname, ""),
  182. ok
  183. end,
  184. start_apps(Apps, DefaultHandler, #{}).
  185. -spec start_apps(Apps :: apps(), Handler :: special_config_handler()) -> ok.
  186. start_apps(Apps, SpecAppConfig) when is_function(SpecAppConfig) ->
  187. start_apps(Apps, SpecAppConfig, #{}).
  188. -spec start_apps(Apps :: apps(), Handler :: special_config_handler(), map()) -> ok.
  189. start_apps(Apps, SpecAppConfig, Opts) when is_function(SpecAppConfig) ->
  190. %% Load all application code to beam vm first
  191. %% Because, minirest, ekka etc.. application will scan these modules
  192. lists:foreach(fun load/1, [emqx | Apps]),
  193. ok = start_ekka(),
  194. ok = emqx_ratelimiter_SUITE:load_conf(),
  195. lists:foreach(fun(App) -> start_app(App, SpecAppConfig, Opts) end, [emqx | Apps]).
  196. load(App) ->
  197. case application:load(App) of
  198. ok -> ok;
  199. {error, {already_loaded, _}} -> ok;
  200. {error, Reason} -> error({failed_to_load_app, App, Reason})
  201. end.
  202. render_and_load_app_config(App) ->
  203. render_and_load_app_config(App, #{}).
  204. render_and_load_app_config(App, Opts) ->
  205. load(App),
  206. Schema = app_schema(App),
  207. ConfFilePath = maps:get(conf_file_path, Opts, filename:join(["etc", app_conf_file(App)])),
  208. Conf = app_path(App, ConfFilePath),
  209. try
  210. do_render_app_config(App, Schema, Conf, Opts)
  211. catch
  212. throw:skip ->
  213. ok;
  214. throw:E:St ->
  215. %% turn throw into error
  216. error({Conf, E, St})
  217. end.
  218. do_render_app_config(App, Schema, ConfigFile, Opts) ->
  219. %% copy acl_conf must run before read_schema_configs
  220. copy_acl_conf(),
  221. Vars = mustache_vars(App, Opts),
  222. RenderedConfigFile = render_config_file(ConfigFile, Vars),
  223. read_schema_configs(Schema, RenderedConfigFile),
  224. force_set_config_file_paths(App, [RenderedConfigFile]),
  225. copy_certs(App, RenderedConfigFile),
  226. ok.
  227. start_app(App, SpecAppConfig) ->
  228. start_app(App, SpecAppConfig, #{}).
  229. start_app(App, SpecAppConfig, Opts) ->
  230. render_and_load_app_config(App, Opts),
  231. SpecAppConfig(App),
  232. case application:ensure_all_started(App) of
  233. {ok, _} ->
  234. ok = ensure_dashboard_listeners_started(App),
  235. ok = wait_for_app_processes(App),
  236. ok = perform_sanity_checks(App),
  237. ok;
  238. {error, Reason} ->
  239. error({failed_to_start_app, App, Reason})
  240. end.
  241. wait_for_app_processes(emqx_conf) ->
  242. %% emqx_conf app has a gen_server which
  243. %% initializes its state asynchronously
  244. gen_server:call(emqx_cluster_rpc, dummy),
  245. ok;
  246. wait_for_app_processes(_) ->
  247. ok.
  248. %% These are checks to detect inter-suite or inter-testcase flakiness
  249. %% early. For example, one suite might forget one application running
  250. %% and stop others, and then the `application:start/2' callback is
  251. %% never called again for this application.
  252. perform_sanity_checks(emqx_rule_engine) ->
  253. ensure_config_handler(emqx_rule_engine, [rule_engine, rules, '?']),
  254. ok;
  255. perform_sanity_checks(emqx_bridge) ->
  256. ensure_config_handler(emqx_bridge, [bridges]),
  257. ok;
  258. perform_sanity_checks(_App) ->
  259. ok.
  260. ensure_config_handler(Module, ConfigPath) ->
  261. #{handlers := Handlers} = emqx_config_handler:info(),
  262. case emqx_utils_maps:deep_get(ConfigPath, Handlers, not_found) of
  263. #{'$mod' := Module} -> ok;
  264. NotFound -> error({config_handler_missing, ConfigPath, Module, NotFound})
  265. end,
  266. ok.
  267. app_conf_file(emqx_conf) -> "emqx.conf.all";
  268. app_conf_file(App) -> atom_to_list(App) ++ ".conf".
  269. app_schema(App) ->
  270. Mod = list_to_atom(atom_to_list(App) ++ "_schema"),
  271. try
  272. true = is_list(Mod:roots()),
  273. Mod
  274. catch
  275. error:undef ->
  276. no_schema
  277. end.
  278. mustache_vars(App, Opts) ->
  279. ExtraMustacheVars = maps:get(extra_mustache_vars, Opts, #{}),
  280. Defaults = #{
  281. node_cookie => atom_to_list(erlang:get_cookie()),
  282. platform_data_dir => app_path(App, "data"),
  283. platform_etc_dir => app_path(App, "etc")
  284. },
  285. maps:merge(Defaults, ExtraMustacheVars).
  286. render_config_file(ConfigFile, Vars0) ->
  287. Temp =
  288. case file:read_file(ConfigFile) of
  289. {ok, T} -> T;
  290. {error, enoent} -> throw(skip);
  291. {error, Reason} -> error({failed_to_read_config_template, ConfigFile, Reason})
  292. end,
  293. Vars = [{atom_to_list(N), iolist_to_binary(V)} || {N, V} <- maps:to_list(Vars0)],
  294. Targ = bbmustache:render(Temp, Vars),
  295. NewName = ConfigFile ++ ".rendered",
  296. ok = file:write_file(NewName, Targ),
  297. NewName.
  298. read_schema_configs(no_schema, _ConfigFile) ->
  299. ok;
  300. read_schema_configs(Schema, ConfigFile) ->
  301. NewConfig = generate_config(Schema, ConfigFile),
  302. application:set_env(NewConfig).
  303. generate_config(SchemaModule, ConfigFile) when is_atom(SchemaModule) ->
  304. {ok, Conf0} = hocon:load(ConfigFile, #{format => richmap}),
  305. hocon_tconf:generate(SchemaModule, Conf0).
  306. -spec stop_apps(list()) -> ok.
  307. stop_apps(Apps) ->
  308. stop_apps(Apps, #{}).
  309. stop_apps(Apps, Opts) ->
  310. [application:stop(App) || App <- Apps ++ [emqx, ekka, mria, mnesia]],
  311. ok = mria_mnesia:delete_schema(),
  312. %% to avoid inter-suite flakiness
  313. application:unset_env(emqx, config_loader),
  314. application:unset_env(emqx, boot_modules),
  315. persistent_term:erase(?EMQX_AUTHENTICATION_SCHEMA_MODULE_PT_KEY),
  316. case Opts of
  317. #{erase_all_configs := false} ->
  318. %% FIXME: this means inter-suite or inter-test dependencies
  319. ok;
  320. _ ->
  321. emqx_config:erase_all()
  322. end,
  323. ok = emqx_config:delete_override_conf_files(),
  324. application:unset_env(emqx, local_override_conf_file),
  325. application:unset_env(emqx, cluster_override_conf_file),
  326. application:unset_env(emqx, cluster_hocon_file),
  327. application:unset_env(gen_rpc, port_discovery),
  328. ok.
  329. proj_root() ->
  330. filename:join(
  331. lists:takewhile(
  332. fun(X) -> iolist_to_binary(X) =/= <<"_build">> end,
  333. filename:split(app_path(emqx, "."))
  334. )
  335. ).
  336. %% backward compatible
  337. deps_path(App, RelativePath) -> app_path(App, RelativePath).
  338. app_path(App, RelativePath) ->
  339. Lib = code:lib_dir(App),
  340. safe_relative_path(filename:join([Lib, RelativePath])).
  341. safe_relative_path(Path) ->
  342. case filename:split(Path) of
  343. ["/" | T] ->
  344. T1 = do_safe_relative_path(filename:join(T)),
  345. filename:join(["/", T1]);
  346. _ ->
  347. do_safe_relative_path(Path)
  348. end.
  349. do_safe_relative_path(Path) ->
  350. case safe_relative_path_2(Path) of
  351. unsafe -> Path;
  352. OK -> OK
  353. end.
  354. -if(?OTP_RELEASE < 23).
  355. safe_relative_path_2(Path) ->
  356. filename:safe_relative_path(Path).
  357. -else.
  358. safe_relative_path_2(Path) ->
  359. {ok, Cwd} = file:get_cwd(),
  360. filelib:safe_relative_path(Path, Cwd).
  361. -endif.
  362. -spec reload(App :: atom(), SpecAppConfig :: special_config_handler()) -> ok.
  363. reload(App, SpecAppConfigHandler) ->
  364. application:stop(App),
  365. start_app(App, SpecAppConfigHandler, #{}),
  366. application:start(App).
  367. ensure_mnesia_stopped() ->
  368. mria:stop(),
  369. mria_mnesia:delete_schema().
  370. %% Help function to wait for Fun to yield 'true'.
  371. wait_for(Fn, Ln, F, Timeout) ->
  372. {Pid, Mref} = erlang:spawn_monitor(fun() -> wait_loop(F, catch_call(F)) end),
  373. wait_for_down(Fn, Ln, Timeout, Pid, Mref, false).
  374. flush() ->
  375. flush([]).
  376. flush(Msgs) ->
  377. receive
  378. M -> flush([M | Msgs])
  379. after 0 -> lists:reverse(Msgs)
  380. end.
  381. client_ssl_twoway() ->
  382. client_ssl_twoway(default).
  383. client_ssl_twoway(TLSVsn) ->
  384. client_certs() ++ ciphers(TLSVsn).
  385. %% Paths prepended to cert filenames
  386. client_certs() ->
  387. [{Key, app_path(emqx, FilePath)} || {Key, FilePath} <- ?MQTT_SSL_CLIENT_CERTS].
  388. client_ssl() ->
  389. client_ssl(default).
  390. client_ssl(TLSVsn) ->
  391. ciphers(TLSVsn) ++ [{reuse_sessions, true}].
  392. %% determined via config file defaults
  393. ciphers(default) -> [];
  394. ciphers('tlsv1.3') -> ?TLS_1_3_CIPHERS;
  395. ciphers(_OlderTLSVsn) -> ?TLS_OLD_CIPHERS.
  396. wait_mqtt_payload(Payload) ->
  397. receive
  398. {publish, #{payload := Payload}} ->
  399. ct:pal("OK - received msg: ~p~n", [Payload])
  400. after 1000 ->
  401. ct:fail({timeout, Payload, {msg_box, flush()}})
  402. end.
  403. not_wait_mqtt_payload(Payload) ->
  404. receive
  405. {publish, #{payload := Payload}} ->
  406. ct:fail({received, Payload})
  407. after 1000 ->
  408. ct:pal("OK - msg ~p is not received", [Payload])
  409. end.
  410. wait_for_down(Fn, Ln, Timeout, Pid, Mref, Kill) ->
  411. receive
  412. {'DOWN', Mref, process, Pid, normal} ->
  413. ok;
  414. {'DOWN', Mref, process, Pid, {unexpected, Result}} ->
  415. erlang:error({unexpected, Fn, Ln, Result});
  416. {'DOWN', Mref, process, Pid, {crashed, {C, E, S}}} ->
  417. erlang:raise(C, {Fn, Ln, E}, S)
  418. after Timeout ->
  419. case Kill of
  420. true ->
  421. erlang:demonitor(Mref, [flush]),
  422. erlang:exit(Pid, kill),
  423. erlang:error({Fn, Ln, timeout});
  424. false ->
  425. Pid ! stop,
  426. wait_for_down(Fn, Ln, Timeout, Pid, Mref, true)
  427. end
  428. end.
  429. wait_loop(_F, ok) ->
  430. exit(normal);
  431. wait_loop(F, LastRes) ->
  432. receive
  433. stop -> erlang:exit(LastRes)
  434. after 100 ->
  435. Res = catch_call(F),
  436. wait_loop(F, Res)
  437. end.
  438. catch_call(F) ->
  439. try
  440. case F() of
  441. true -> ok;
  442. Other -> {unexpected, Other}
  443. end
  444. catch
  445. C:E:S ->
  446. {crashed, {C, E, S}}
  447. end.
  448. force_set_config_file_paths(emqx_conf, [Path] = Paths) ->
  449. Bin = iolist_to_binary(io_lib:format("node.config_files = [~p]~n", [Path])),
  450. ok = file:write_file(Path, Bin, [append]),
  451. application:set_env(emqx, config_files, Paths);
  452. force_set_config_file_paths(emqx, Paths) ->
  453. %% we need init cluster conf, so we can save the cluster conf to the file
  454. application:set_env(emqx, local_override_conf_file, "local_override.conf"),
  455. application:set_env(emqx, cluster_override_conf_file, "cluster_override.conf"),
  456. application:set_env(emqx, cluster_conf_file, "cluster.hocon"),
  457. application:set_env(emqx, config_files, Paths);
  458. force_set_config_file_paths(_, _) ->
  459. ok.
  460. copy_certs(emqx_conf, Dest0) ->
  461. Dest = filename:dirname(Dest0),
  462. From = string:replace(Dest, "emqx_conf", "emqx"),
  463. os:cmd(["cp -rf ", From, "/certs ", Dest, "/"]),
  464. ok;
  465. copy_certs(_, _) ->
  466. ok.
  467. copy_acl_conf() ->
  468. Dest = filename:join([code:lib_dir(emqx), "etc/acl.conf"]),
  469. case code:lib_dir(emqx_authz) of
  470. {error, bad_name} ->
  471. (not filelib:is_regular(Dest)) andalso file:write_file(Dest, <<"">>);
  472. _ ->
  473. {ok, _} = file:copy(deps_path(emqx_authz, "etc/acl.conf"), Dest)
  474. end,
  475. ok.
  476. load_config(SchemaModule, Config) ->
  477. ConfigBin =
  478. case is_map(Config) of
  479. true -> emqx_utils_json:encode(Config);
  480. false -> Config
  481. end,
  482. ok = emqx_config:delete_override_conf_files(),
  483. ok = copy_acl_conf(),
  484. ok = emqx_config:init_load(SchemaModule, ConfigBin).
  485. -spec is_all_tcp_servers_available(Servers) -> Result when
  486. Servers :: [{Host, Port}],
  487. Host :: inet:socket_address() | inet:hostname(),
  488. Port :: inet:port_number(),
  489. Result :: boolean().
  490. is_all_tcp_servers_available(Servers) ->
  491. Fun =
  492. fun({Host, Port}) ->
  493. is_tcp_server_available(Host, Port)
  494. end,
  495. case lists:partition(Fun, Servers) of
  496. {_, []} ->
  497. true;
  498. {_, Unavail} ->
  499. ct:pal("Unavailable servers: ~p", [Unavail]),
  500. false
  501. end.
  502. -spec is_tcp_server_available(
  503. Host :: inet:socket_address() | inet:hostname(),
  504. Port :: inet:port_number()
  505. ) -> boolean.
  506. is_tcp_server_available(Host, Port) ->
  507. is_tcp_server_available(Host, Port, ?DEFAULT_TCP_SERVER_CHECK_AVAIL_TIMEOUT).
  508. -spec is_tcp_server_available(
  509. Host :: inet:socket_address() | inet:hostname(),
  510. Port :: inet:port_number(),
  511. Timeout :: integer()
  512. ) -> boolean.
  513. is_tcp_server_available(Host, Port, Timeout) ->
  514. case gen_tcp:connect(Host, Port, [], Timeout) of
  515. {ok, Socket} ->
  516. gen_tcp:close(Socket),
  517. true;
  518. {error, _} ->
  519. false
  520. end.
  521. start_ekka() ->
  522. try mnesia_hook:module_info() of
  523. _ -> ekka:start()
  524. catch
  525. _:_ ->
  526. %% Falling back to using Mnesia DB backend.
  527. application:set_env(mria, db_backend, mnesia),
  528. ekka:start()
  529. end.
  530. ensure_dashboard_listeners_started(emqx_dashboard) ->
  531. true = emqx_dashboard_listener:is_ready(infinity),
  532. ok;
  533. ensure_dashboard_listeners_started(_App) ->
  534. ok.
  535. -spec ensure_quic_listener(Name :: atom(), UdpPort :: inet:port_number()) -> ok.
  536. ensure_quic_listener(Name, UdpPort) ->
  537. ensure_quic_listener(Name, UdpPort, #{}).
  538. -spec ensure_quic_listener(Name :: atom(), UdpPort :: inet:port_number(), map()) -> ok.
  539. ensure_quic_listener(Name, UdpPort, ExtraSettings) ->
  540. application:ensure_all_started(quicer),
  541. Conf = #{
  542. acceptors => 16,
  543. bind => UdpPort,
  544. ciphers =>
  545. [
  546. "TLS_AES_256_GCM_SHA384",
  547. "TLS_AES_128_GCM_SHA256",
  548. "TLS_CHACHA20_POLY1305_SHA256"
  549. ],
  550. enable => true,
  551. idle_timeout => 15000,
  552. ssl_options => #{
  553. certfile => filename:join(code:lib_dir(emqx), "etc/certs/cert.pem"),
  554. keyfile => filename:join(code:lib_dir(emqx), "etc/certs/key.pem")
  555. },
  556. limiter => #{},
  557. max_connections => 1024000,
  558. mountpoint => <<>>,
  559. zone => default
  560. },
  561. Conf2 = maps:merge(Conf, ExtraSettings),
  562. emqx_config:put([listeners, quic, Name], Conf2),
  563. case emqx_listeners:start_listener(emqx_listeners:listener_id(quic, Name)) of
  564. ok -> ok;
  565. {error, {already_started, _Pid}} -> ok;
  566. Other -> throw(Other)
  567. end.
  568. %%
  569. %% Clusterisation and multi-node testing
  570. %%
  571. -type cluster_spec() :: [node_spec()].
  572. -type node_spec() :: role() | {role(), shortname()} | {role(), shortname(), node_opts()}.
  573. -type role() :: core | replicant.
  574. -type shortname() :: atom().
  575. -type nodename() :: atom().
  576. -type node_opts() :: #{
  577. %% Need to loaded apps. These apps will be loaded once the node started
  578. load_apps => list(),
  579. %% Need to started apps. It is the first arg passed to emqx_common_test_helpers:start_apps/2
  580. apps => list(),
  581. %% Extras app starting handler. It is the second arg passed to emqx_common_test_helpers:start_apps/2
  582. env_handler => fun((AppName :: atom()) -> term()),
  583. %% Application env preset before calling `emqx_common_test_helpers:start_apps/2`
  584. env => [{AppName :: atom(), Key :: atom(), Val :: term()}],
  585. %% Whether to execute `emqx_config:init_load(SchemaMod)`
  586. %% default: true
  587. load_schema => boolean(),
  588. %% Which node in the cluster to join to.
  589. %% default: first core node
  590. join_to => node(),
  591. %% If we want to exercise the scenario where a node joins an
  592. %% existing cluster where there has already been some
  593. %% configuration changes (via cluster rpc), then we need to enable
  594. %% autocluster so that the joining node will restart the
  595. %% `emqx_conf' app and correctly catch up the config.
  596. start_autocluster => boolean(),
  597. %% Eval by emqx_config:put/2
  598. conf => [{KeyPath :: list(), Val :: term()}],
  599. %% Fast option to config listener port
  600. %% default rule:
  601. %% - tcp: base_port
  602. %% - ssl: base_port + 1
  603. %% - ws : base_port + 3
  604. %% - wss: base_port + 4
  605. listener_ports => [{Type :: tcp | ssl | ws | wss, inet:port_number()}]
  606. }.
  607. -spec emqx_cluster(cluster_spec()) -> [{shortname(), node_opts()}].
  608. emqx_cluster(Specs) ->
  609. emqx_cluster(Specs, #{}).
  610. -spec emqx_cluster(cluster_spec(), node_opts()) -> [{shortname(), node_opts()}].
  611. emqx_cluster(Specs, CommonOpts) when is_list(CommonOpts) ->
  612. emqx_cluster(Specs, maps:from_list(CommonOpts));
  613. emqx_cluster(Specs0, CommonOpts) ->
  614. Specs1 = lists:zip(Specs0, lists:seq(1, length(Specs0))),
  615. Specs = expand_node_specs(Specs1, CommonOpts),
  616. %% Assign grpc ports
  617. GenRpcPorts = maps:from_list([
  618. {node_name(Name), {tcp, gen_rpc_port(base_port(Num))}}
  619. || {{_, Name, _}, Num} <- Specs
  620. ]),
  621. %% Set the default node of the cluster:
  622. CoreNodes = [node_name(Name) || {{core, Name, _}, _} <- Specs],
  623. JoinTo =
  624. case CoreNodes of
  625. [First | _] -> First;
  626. _ -> undefined
  627. end,
  628. NodeOpts = fun(Number) ->
  629. #{
  630. base_port => base_port(Number),
  631. env => [
  632. {mria, core_nodes, CoreNodes},
  633. {gen_rpc, client_config_per_node, {internal, GenRpcPorts}}
  634. ]
  635. }
  636. end,
  637. RoleOpts = fun
  638. (core) ->
  639. #{
  640. join_to => JoinTo,
  641. env => [
  642. {mria, node_role, core}
  643. ]
  644. };
  645. (replicant) ->
  646. #{
  647. env => [
  648. {mria, node_role, replicant},
  649. {ekka, cluster_discovery, {static, [{seeds, CoreNodes}]}}
  650. ]
  651. }
  652. end,
  653. [
  654. {Name, merge_opts(merge_opts(NodeOpts(Number), RoleOpts(Role)), Opts)}
  655. || {{Role, Name, Opts}, Number} <- Specs
  656. ].
  657. %% Lower level starting API
  658. -spec start_slave(shortname(), node_opts()) -> nodename().
  659. start_slave(Name, Opts) when is_list(Opts) ->
  660. start_slave(Name, maps:from_list(Opts));
  661. start_slave(Name, Opts) when is_map(Opts) ->
  662. SlaveMod = maps:get(peer_mod, Opts, ct_slave),
  663. Node = node_name(Name),
  664. put_peer_mod(Node, SlaveMod),
  665. Cookie = atom_to_list(erlang:get_cookie()),
  666. PrivDataDir = maps:get(priv_data_dir, Opts, "/tmp"),
  667. NodeDataDir = filename:join([
  668. PrivDataDir,
  669. Node,
  670. integer_to_list(erlang:unique_integer())
  671. ]),
  672. DoStart =
  673. fun() ->
  674. case SlaveMod of
  675. ct_slave ->
  676. ct:pal("~p: node data dir: ~s", [Node, NodeDataDir]),
  677. ct_slave:start(
  678. Node,
  679. [
  680. {kill_if_fail, true},
  681. {monitor_master, true},
  682. {init_timeout, 20_000},
  683. {startup_timeout, 20_000},
  684. {erl_flags, erl_flags()},
  685. {env, [
  686. {"HOCON_ENV_OVERRIDE_PREFIX", "EMQX_"},
  687. {"EMQX_NODE__COOKIE", Cookie},
  688. {"EMQX_NODE__DATA_DIR", NodeDataDir}
  689. ]}
  690. ]
  691. );
  692. slave ->
  693. Env = " -env HOCON_ENV_OVERRIDE_PREFIX EMQX_",
  694. slave:start_link(host(), Name, ebin_path() ++ Env)
  695. end
  696. end,
  697. case DoStart() of
  698. {ok, _} ->
  699. ok;
  700. {error, started_not_connected, _} ->
  701. ok;
  702. Other ->
  703. throw(Other)
  704. end,
  705. pong = net_adm:ping(Node),
  706. ok = snabbkaffe:forward_trace(Node),
  707. setup_node(Node, Opts),
  708. Node.
  709. %% Node stopping
  710. stop_slave(Node0) ->
  711. Node = node_name(Node0),
  712. SlaveMod = get_peer_mod(Node),
  713. erase_peer_mod(Node),
  714. case SlaveMod:stop(Node) of
  715. ok -> ok;
  716. {ok, _} -> ok;
  717. {error, not_started, _} -> ok
  718. end.
  719. %% EPMD starting
  720. start_epmd() ->
  721. [] = os:cmd("\"" ++ epmd_path() ++ "\" -daemon"),
  722. ok.
  723. epmd_path() ->
  724. case os:find_executable("epmd") of
  725. false ->
  726. ct:pal(critical, "Could not find epmd.~n"),
  727. exit(epmd_not_found);
  728. GlobalEpmd ->
  729. GlobalEpmd
  730. end.
  731. %% Node initialization
  732. -spec setup_node(nodename(), node_opts()) -> ok.
  733. setup_node(Node, Opts) when is_list(Opts) ->
  734. setup_node(Node, maps:from_list(Opts));
  735. setup_node(Node, Opts) when is_map(Opts) ->
  736. %% Default base port is selected upon Node from 1100 to 65530 with step 10
  737. BasePort = maps:get(base_port, Opts, 1100 + erlang:phash2(Node, 6553 - 110) * 10),
  738. Apps = maps:get(apps, Opts, []),
  739. StartApps = maps:get(start_apps, Opts, true),
  740. JoinTo = maps:get(join_to, Opts, undefined),
  741. EnvHandler = maps:get(env_handler, Opts, fun(_) -> ok end),
  742. ConfigureGenRpc = maps:get(configure_gen_rpc, Opts, true),
  743. LoadSchema = maps:get(load_schema, Opts, true),
  744. SchemaMod = maps:get(schema_mod, Opts, emqx_schema),
  745. LoadApps = maps:get(load_apps, Opts, Apps),
  746. Env = maps:get(env, Opts, []),
  747. Conf = maps:get(conf, Opts, []),
  748. ListenerPorts = maps:get(listener_ports, Opts, [
  749. {Type, listener_port(BasePort, Type)}
  750. || Type <- [tcp, ssl, ws, wss]
  751. ]),
  752. %% we need a fresh data dir for each peer node to avoid unintended
  753. %% successes due to sharing of data in the cluster.
  754. PrivDataDir = maps:get(priv_data_dir, Opts, "/tmp"),
  755. %% If we want to exercise the scenario where a node joins an
  756. %% existing cluster where there has already been some
  757. %% configuration changes (via cluster rpc), then we need to enable
  758. %% autocluster so that the joining node will restart the
  759. %% `emqx_conf' app and correctly catch up the config.
  760. StartAutocluster = maps:get(start_autocluster, Opts, false),
  761. ct:pal(
  762. "setting up node ~p:\n ~p",
  763. [
  764. Node,
  765. #{
  766. start_autocluster => StartAutocluster,
  767. load_apps => LoadApps,
  768. apps => Apps,
  769. env => Env,
  770. join_to => JoinTo,
  771. start_apps => StartApps
  772. }
  773. ]
  774. ),
  775. %% Load env before doing anything to avoid overriding
  776. [ok = erpc:call(Node, ?MODULE, load, [App]) || App <- [gen_rpc, ekka, mria, emqx | LoadApps]],
  777. %% Ensure a clean mnesia directory for each run to avoid
  778. %% inter-test flakiness.
  779. MnesiaDataDir = filename:join([
  780. PrivDataDir,
  781. Node,
  782. integer_to_list(erlang:unique_integer()),
  783. "mnesia"
  784. ]),
  785. case erpc:call(Node, application, get_env, [mnesia, dir, undefined]) of
  786. undefined ->
  787. ct:pal("~p: setting mnesia dir: ~p", [Node, MnesiaDataDir]),
  788. erpc:call(Node, application, set_env, [mnesia, dir, MnesiaDataDir]);
  789. PreviousMnesiaDir ->
  790. ct:pal("~p: mnesia dir already set: ~p", [Node, PreviousMnesiaDir]),
  791. ok
  792. end,
  793. %% Needs to be set explicitly because ekka:start() (which calls `gen`) is called without Handler
  794. %% in emqx_common_test_helpers:start_apps(...)
  795. ConfigureGenRpc andalso
  796. begin
  797. ok = rpc:call(Node, application, set_env, [
  798. gen_rpc, tcp_server_port, gen_rpc_port(BasePort)
  799. ]),
  800. ok = rpc:call(Node, application, set_env, [gen_rpc, port_discovery, manual])
  801. end,
  802. %% Setting env before starting any applications
  803. set_envs(Node, Env),
  804. NodeDataDir = filename:join([
  805. PrivDataDir,
  806. node(),
  807. integer_to_list(erlang:unique_integer())
  808. ]),
  809. %% Here we start the apps
  810. EnvHandlerForRpc =
  811. fun(App) ->
  812. %% We load configuration, and than set the special environment variable
  813. %% which says that emqx shouldn't load configuration at startup
  814. %% Otherwise, configuration gets loaded and all preset env in EnvHandler is lost
  815. LoadSchema andalso
  816. begin
  817. %% to avoid sharing data between executions and/or
  818. %% nodes. these variables might not be in the
  819. %% config file (e.g.: emqx_enterprise_schema).
  820. Cookie = atom_to_list(erlang:get_cookie()),
  821. set_env_once("EMQX_NODE__DATA_DIR", NodeDataDir),
  822. set_env_once("EMQX_NODE__COOKIE", Cookie),
  823. emqx_config:init_load(SchemaMod),
  824. emqx_app:set_config_loader(emqx_conf)
  825. end,
  826. %% Need to set this otherwise listeners will conflict between each other
  827. [
  828. ok = emqx_config:put([listeners, Type, default, bind], {
  829. {127, 0, 0, 1}, Port
  830. })
  831. || {Type, Port} <- ListenerPorts
  832. ],
  833. [ok = emqx_config:put(KeyPath, Value) || {KeyPath, Value} <- Conf],
  834. ok = EnvHandler(App),
  835. ok
  836. end,
  837. StartApps andalso
  838. begin
  839. ok = rpc:call(Node, emqx_common_test_helpers, start_apps, [Apps, EnvHandlerForRpc])
  840. end,
  841. %% Join the cluster if JoinTo is specified
  842. case JoinTo of
  843. undefined ->
  844. ok;
  845. _ ->
  846. StartAutocluster andalso
  847. begin
  848. %% Note: we need to re-set the env because
  849. %% starting the apps apparently make some of them
  850. %% to be lost... This is particularly useful for
  851. %% setting extra apps to be restarted after
  852. %% joining.
  853. set_envs(Node, Env),
  854. ok = erpc:call(Node, emqx_machine_boot, start_autocluster, [])
  855. end,
  856. case rpc:call(Node, ekka, join, [JoinTo]) of
  857. ok ->
  858. ok;
  859. ignore ->
  860. ok;
  861. Err ->
  862. stop_slave(Node),
  863. error({failed_to_join_cluster, #{node => Node, error => Err}})
  864. end
  865. end,
  866. ok.
  867. %% Helpers
  868. set_env_once(Var, Value) ->
  869. case os:getenv(Var) of
  870. false ->
  871. os:putenv(Var, Value);
  872. _OldValue ->
  873. ok
  874. end,
  875. ok.
  876. put_peer_mod(Node, SlaveMod) ->
  877. put({?MODULE, Node}, SlaveMod),
  878. ok.
  879. get_peer_mod(Node) ->
  880. case get({?MODULE, Node}) of
  881. undefined -> ct_slave;
  882. SlaveMod -> SlaveMod
  883. end.
  884. erase_peer_mod(Node) ->
  885. erase({?MODULE, Node}).
  886. node_name(Name) ->
  887. case string:tokens(atom_to_list(Name), "@") of
  888. [_Name, _Host] ->
  889. %% the name already has a @
  890. Name;
  891. _ ->
  892. list_to_atom(atom_to_list(Name) ++ "@" ++ host())
  893. end.
  894. gen_node_name(Num) ->
  895. list_to_atom("autocluster_node" ++ integer_to_list(Num)).
  896. host() ->
  897. [_, Host] = string:tokens(atom_to_list(node()), "@"),
  898. Host.
  899. merge_opts(Opts1, Opts2) ->
  900. maps:merge_with(
  901. fun
  902. (env, Env1, Env2) -> lists:usort(Env2 ++ Env1);
  903. (conf, Conf1, Conf2) -> lists:usort(Conf2 ++ Conf1);
  904. (apps, Apps1, Apps2) -> lists:usort(Apps2 ++ Apps1);
  905. (load_apps, Apps1, Apps2) -> lists:usort(Apps2 ++ Apps1);
  906. (_Option, _Old, Value) -> Value
  907. end,
  908. Opts1,
  909. Opts2
  910. ).
  911. set_envs(Node, Env) ->
  912. lists:foreach(
  913. fun({Application, Key, Value}) ->
  914. ok = rpc:call(Node, application, set_env, [Application, Key, Value])
  915. end,
  916. Env
  917. ).
  918. erl_flags() ->
  919. %% One core and redirecting logs to master
  920. "+S 1:1 -master " ++ atom_to_list(node()) ++ " " ++ ebin_path().
  921. ebin_path() ->
  922. string:join(["-pa" | lists:filter(fun is_lib/1, code:get_path())], " ").
  923. is_lib(Path) ->
  924. string:prefix(Path, code:lib_dir()) =:= nomatch andalso
  925. string:str(Path, "_build/default/plugins") =:= 0.
  926. %% Ports
  927. base_port(Number) ->
  928. 10000 + Number * 100.
  929. gen_rpc_port(BasePort) ->
  930. BasePort - 1.
  931. listener_port(Opts, Type) when is_map(Opts) ->
  932. BasePort = maps:get(base_port, Opts),
  933. listener_port(BasePort, Type);
  934. listener_port(BasePort, tcp) ->
  935. BasePort;
  936. listener_port(BasePort, ssl) ->
  937. BasePort + 1;
  938. listener_port(BasePort, quic) ->
  939. BasePort + 2;
  940. listener_port(BasePort, ws) ->
  941. BasePort + 3;
  942. listener_port(BasePort, wss) ->
  943. BasePort + 4.
  944. %% Autocluster helpers
  945. expand_node_specs(Specs, CommonOpts) ->
  946. lists:map(
  947. fun({Spec, Num}) ->
  948. {
  949. case Spec of
  950. core ->
  951. {core, gen_node_name(Num), CommonOpts};
  952. replicant ->
  953. {replicant, gen_node_name(Num), CommonOpts};
  954. {Role, Name} when is_atom(Name) ->
  955. {Role, Name, CommonOpts};
  956. {Role, Opts} when is_list(Opts) ->
  957. Opts1 = maps:from_list(Opts),
  958. {Role, gen_node_name(Num), merge_opts(CommonOpts, Opts1)};
  959. {Role, Name, Opts} when is_list(Opts) ->
  960. Opts1 = maps:from_list(Opts),
  961. {Role, Name, merge_opts(CommonOpts, Opts1)};
  962. {Role, Opts} ->
  963. {Role, gen_node_name(Num), merge_opts(CommonOpts, Opts)};
  964. {Role, Name, Opts} ->
  965. {Role, Name, merge_opts(CommonOpts, Opts)}
  966. end,
  967. Num
  968. }
  969. end,
  970. Specs
  971. ).
  972. %% is useful when iterating on the tests in a loop, to get rid of all
  973. %% the garbaged printed before the test itself beings.
  974. clear_screen() ->
  975. io:format(standard_io, "\033[H\033[2J", []),
  976. io:format(standard_error, "\033[H\033[2J", []),
  977. io:format(standard_io, "\033[H\033[3J", []),
  978. io:format(standard_error, "\033[H\033[3J", []),
  979. ok.
  980. with_mock(Mod, FnName, MockedFn, Fun) ->
  981. ok = meck:new(Mod, [non_strict, no_link, no_history, passthrough]),
  982. ok = meck:expect(Mod, FnName, MockedFn),
  983. try
  984. Fun()
  985. after
  986. ok = meck:unload(Mod)
  987. end.
  988. %%-------------------------------------------------------------------------------
  989. %% Toxiproxy utils
  990. %%-------------------------------------------------------------------------------
  991. reset_proxy(ProxyHost, ProxyPort) ->
  992. Url = "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/reset",
  993. Body = <<>>,
  994. {ok, {{_, 204, _}, _, _}} = httpc:request(
  995. post,
  996. {Url, [], "application/json", Body},
  997. [],
  998. [{body_format, binary}]
  999. ).
  1000. with_failure(FailureType, Name, ProxyHost, ProxyPort, Fun) ->
  1001. enable_failure(FailureType, Name, ProxyHost, ProxyPort),
  1002. try
  1003. Fun()
  1004. after
  1005. heal_failure(FailureType, Name, ProxyHost, ProxyPort)
  1006. end.
  1007. enable_failure(FailureType, Name, ProxyHost, ProxyPort) ->
  1008. case FailureType of
  1009. down -> switch_proxy(off, Name, ProxyHost, ProxyPort);
  1010. timeout -> timeout_proxy(on, Name, ProxyHost, ProxyPort);
  1011. latency_up -> latency_up_proxy(on, Name, ProxyHost, ProxyPort)
  1012. end.
  1013. heal_failure(FailureType, Name, ProxyHost, ProxyPort) ->
  1014. case FailureType of
  1015. down -> switch_proxy(on, Name, ProxyHost, ProxyPort);
  1016. timeout -> timeout_proxy(off, Name, ProxyHost, ProxyPort);
  1017. latency_up -> latency_up_proxy(off, Name, ProxyHost, ProxyPort)
  1018. end.
  1019. switch_proxy(Switch, Name, ProxyHost, ProxyPort) ->
  1020. Url = "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/proxies/" ++ Name,
  1021. Body =
  1022. case Switch of
  1023. off -> #{<<"enabled">> => false};
  1024. on -> #{<<"enabled">> => true}
  1025. end,
  1026. BodyBin = emqx_utils_json:encode(Body),
  1027. {ok, {{_, 200, _}, _, _}} = httpc:request(
  1028. post,
  1029. {Url, [], "application/json", BodyBin},
  1030. [],
  1031. [{body_format, binary}]
  1032. ).
  1033. timeout_proxy(on, Name, ProxyHost, ProxyPort) ->
  1034. Url =
  1035. "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/proxies/" ++ Name ++
  1036. "/toxics",
  1037. NameBin = list_to_binary(Name),
  1038. Body = #{
  1039. <<"name">> => <<NameBin/binary, "_timeout">>,
  1040. <<"type">> => <<"timeout">>,
  1041. <<"stream">> => <<"upstream">>,
  1042. <<"toxicity">> => 1.0,
  1043. <<"attributes">> => #{<<"timeout">> => 0}
  1044. },
  1045. BodyBin = emqx_utils_json:encode(Body),
  1046. {ok, {{_, 200, _}, _, _}} = httpc:request(
  1047. post,
  1048. {Url, [], "application/json", BodyBin},
  1049. [],
  1050. [{body_format, binary}]
  1051. );
  1052. timeout_proxy(off, Name, ProxyHost, ProxyPort) ->
  1053. ToxicName = Name ++ "_timeout",
  1054. Url =
  1055. "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/proxies/" ++ Name ++
  1056. "/toxics/" ++ ToxicName,
  1057. Body = <<>>,
  1058. {ok, {{_, 204, _}, _, _}} = httpc:request(
  1059. delete,
  1060. {Url, [], "application/json", Body},
  1061. [],
  1062. [{body_format, binary}]
  1063. ).
  1064. latency_up_proxy(on, Name, ProxyHost, ProxyPort) ->
  1065. Url =
  1066. "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/proxies/" ++ Name ++
  1067. "/toxics",
  1068. NameBin = list_to_binary(Name),
  1069. Body = #{
  1070. <<"name">> => <<NameBin/binary, "_latency_up">>,
  1071. <<"type">> => <<"latency">>,
  1072. <<"stream">> => <<"upstream">>,
  1073. <<"toxicity">> => 1.0,
  1074. <<"attributes">> => #{
  1075. <<"latency">> => 20_000,
  1076. <<"jitter">> => 3_000
  1077. }
  1078. },
  1079. BodyBin = emqx_utils_json:encode(Body),
  1080. {ok, {{_, 200, _}, _, _}} = httpc:request(
  1081. post,
  1082. {Url, [], "application/json", BodyBin},
  1083. [],
  1084. [{body_format, binary}]
  1085. );
  1086. latency_up_proxy(off, Name, ProxyHost, ProxyPort) ->
  1087. ToxicName = Name ++ "_latency_up",
  1088. Url =
  1089. "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/proxies/" ++ Name ++
  1090. "/toxics/" ++ ToxicName,
  1091. Body = <<>>,
  1092. {ok, {{_, 204, _}, _, _}} = httpc:request(
  1093. delete,
  1094. {Url, [], "application/json", Body},
  1095. [],
  1096. [{body_format, binary}]
  1097. ).
  1098. %%-------------------------------------------------------------------------------
  1099. %% TLS certs
  1100. %%-------------------------------------------------------------------------------
  1101. gen_ca(Path, Name) ->
  1102. %% Generate ca.pem and ca.key which will be used to generate certs
  1103. %% for hosts server and clients
  1104. ECKeyFile = filename(Path, "~s-ec.key", [Name]),
  1105. filelib:ensure_dir(ECKeyFile),
  1106. os:cmd("openssl ecparam -name secp256r1 > " ++ ECKeyFile),
  1107. Cmd = lists:flatten(
  1108. io_lib:format(
  1109. "openssl req -new -x509 -nodes "
  1110. "-newkey ec:~s "
  1111. "-keyout ~s -out ~s -days 3650 "
  1112. "-subj \"/C=SE/O=Internet Widgits Pty Ltd CA\"",
  1113. [
  1114. ECKeyFile,
  1115. ca_key_name(Path, Name),
  1116. ca_cert_name(Path, Name)
  1117. ]
  1118. )
  1119. ),
  1120. os:cmd(Cmd).
  1121. ca_cert_name(Path, Name) ->
  1122. filename(Path, "~s.pem", [Name]).
  1123. ca_key_name(Path, Name) ->
  1124. filename(Path, "~s.key", [Name]).
  1125. gen_host_cert(H, CaName, Path) ->
  1126. gen_host_cert(H, CaName, Path, #{}).
  1127. gen_host_cert(H, CaName, Path, Opts) ->
  1128. ECKeyFile = filename(Path, "~s-ec.key", [CaName]),
  1129. CN = str(H),
  1130. HKey = filename(Path, "~s.key", [H]),
  1131. HCSR = filename(Path, "~s.csr", [H]),
  1132. HPEM = filename(Path, "~s.pem", [H]),
  1133. HEXT = filename(Path, "~s.extfile", [H]),
  1134. PasswordArg =
  1135. case maps:get(password, Opts, undefined) of
  1136. undefined ->
  1137. " -nodes ";
  1138. Password ->
  1139. io_lib:format(" -passout pass:'~s' ", [Password])
  1140. end,
  1141. CSR_Cmd =
  1142. lists:flatten(
  1143. io_lib:format(
  1144. "openssl req -new ~s -newkey ec:~s "
  1145. "-keyout ~s -out ~s "
  1146. "-addext \"subjectAltName=DNS:~s\" "
  1147. "-addext keyUsage=digitalSignature,keyAgreement "
  1148. "-subj \"/C=SE/O=Internet Widgits Pty Ltd/CN=~s\"",
  1149. [PasswordArg, ECKeyFile, HKey, HCSR, CN, CN]
  1150. )
  1151. ),
  1152. create_file(
  1153. HEXT,
  1154. "keyUsage=digitalSignature,keyAgreement\n"
  1155. "subjectAltName=DNS:~s\n",
  1156. [CN]
  1157. ),
  1158. CERT_Cmd =
  1159. lists:flatten(
  1160. io_lib:format(
  1161. "openssl x509 -req "
  1162. "-extfile ~s "
  1163. "-in ~s -CA ~s -CAkey ~s -CAcreateserial "
  1164. "-out ~s -days 500",
  1165. [
  1166. HEXT,
  1167. HCSR,
  1168. ca_cert_name(Path, CaName),
  1169. ca_key_name(Path, CaName),
  1170. HPEM
  1171. ]
  1172. )
  1173. ),
  1174. ct:pal(os:cmd(CSR_Cmd)),
  1175. ct:pal(os:cmd(CERT_Cmd)),
  1176. file:delete(HEXT).
  1177. filename(Path, F, A) ->
  1178. filename:join(Path, str(io_lib:format(F, A))).
  1179. str(Arg) ->
  1180. binary_to_list(iolist_to_binary(Arg)).
  1181. create_file(Filename, Fmt, Args) ->
  1182. filelib:ensure_dir(Filename),
  1183. {ok, F} = file:open(Filename, [write]),
  1184. try
  1185. io:format(F, Fmt, Args)
  1186. after
  1187. file:close(F)
  1188. end,
  1189. ok.
  1190. %%-------------------------------------------------------------------------------
  1191. %% Testcase teardown utilities
  1192. %%-------------------------------------------------------------------------------
  1193. %% stop the janitor gracefully to ensure proper cleanup order and less
  1194. %% noise in the logs.
  1195. call_janitor() ->
  1196. call_janitor(15_000).
  1197. call_janitor(Timeout) ->
  1198. Janitor = get_or_spawn_janitor(),
  1199. ok = emqx_test_janitor:stop(Janitor, Timeout),
  1200. erase({?MODULE, janitor_proc}),
  1201. ok.
  1202. get_or_spawn_janitor() ->
  1203. case get({?MODULE, janitor_proc}) of
  1204. undefined ->
  1205. {ok, Janitor} = emqx_test_janitor:start_link(),
  1206. put({?MODULE, janitor_proc}, Janitor),
  1207. Janitor;
  1208. Janitor ->
  1209. Janitor
  1210. end.
  1211. on_exit(Fun) ->
  1212. Janitor = get_or_spawn_janitor(),
  1213. ok = emqx_test_janitor:push_on_exit_callback(Janitor, Fun).
  1214. %%-------------------------------------------------------------------------------
  1215. %% Select a free transport port from the OS
  1216. %%-------------------------------------------------------------------------------
  1217. %% @doc get unused port from OS
  1218. -spec select_free_port(tcp | udp | ssl | quic) -> inets:port_number().
  1219. select_free_port(tcp) ->
  1220. select_free_port(gen_tcp, listen);
  1221. select_free_port(udp) ->
  1222. select_free_port(gen_udp, open);
  1223. select_free_port(ssl) ->
  1224. select_free_port(tcp);
  1225. select_free_port(quic) ->
  1226. select_free_port(udp).
  1227. select_free_port(GenModule, Fun) when
  1228. GenModule == gen_tcp orelse
  1229. GenModule == gen_udp
  1230. ->
  1231. {ok, S} = GenModule:Fun(0, [{reuseaddr, true}]),
  1232. {ok, Port} = inet:port(S),
  1233. ok = GenModule:close(S),
  1234. case os:type() of
  1235. {unix, darwin} ->
  1236. %% in MacOS, still get address_in_use after close port
  1237. timer:sleep(500);
  1238. _ ->
  1239. skip
  1240. end,
  1241. ct:pal("Select free OS port: ~p", [Port]),
  1242. Port.