emqx_common_test_helpers.erl 45 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2019-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_common_test_helpers).
  17. -type special_config_handler() :: fun().
  18. -type apps() :: list(atom()).
  19. -export([
  20. all/1,
  21. matrix_to_groups/2,
  22. group_path/1,
  23. init_per_testcase/3,
  24. end_per_testcase/3,
  25. boot_modules/1,
  26. start_apps/1,
  27. start_apps/2,
  28. start_apps/3,
  29. start_app/2,
  30. stop_apps/1,
  31. stop_apps/2,
  32. reload/2,
  33. app_path/2,
  34. proj_root/0,
  35. deps_path/2,
  36. flush/0,
  37. flush/1,
  38. load/1,
  39. render_and_load_app_config/1,
  40. render_and_load_app_config/2,
  41. copy_acl_conf/0
  42. ]).
  43. -export([
  44. client_ssl/0,
  45. client_ssl/1,
  46. client_mtls/0,
  47. client_mtls/1,
  48. ensure_mnesia_stopped/0,
  49. ensure_quic_listener/2,
  50. ensure_quic_listener/3,
  51. is_all_tcp_servers_available/1,
  52. is_tcp_server_available/2,
  53. is_tcp_server_available/3,
  54. load_config/2,
  55. not_wait_mqtt_payload/1,
  56. read_schema_configs/2,
  57. render_config_file/2,
  58. wait_for/4,
  59. wait_publishes/2,
  60. wait_mqtt_payload/1,
  61. select_free_port/1
  62. ]).
  63. -export([
  64. ssl_verify_fun_allow_any_host/0,
  65. ssl_verify_fun_allow_any_host_impl/3
  66. ]).
  67. -export([
  68. emqx_cluster/1,
  69. emqx_cluster/2,
  70. start_ekka/0,
  71. start_epmd/0,
  72. start_peer/2,
  73. stop_peer/1,
  74. listener_port/2
  75. ]).
  76. -export([clear_screen/0]).
  77. -export([with_mock/4]).
  78. -export([
  79. on_exit/1,
  80. call_janitor/0,
  81. call_janitor/1
  82. ]).
  83. %% Toxiproxy API
  84. -export([
  85. with_failure/5,
  86. enable_failure/4,
  87. heal_failure/4,
  88. reset_proxy/2
  89. ]).
  90. %% TLS certs API
  91. -export([
  92. gen_ca/2,
  93. gen_host_cert/3,
  94. gen_host_cert/4
  95. ]).
  96. -define(CERTS_PATH(CertName), filename:join(["etc", "certs", CertName])).
  97. -define(MQTT_SSL_CLIENT_CERTS, [
  98. {keyfile, ?CERTS_PATH("client-key.pem")},
  99. {cacertfile, ?CERTS_PATH("cacert.pem")},
  100. {certfile, ?CERTS_PATH("client-cert.pem")}
  101. ]).
  102. -define(TLS_1_3_CIPHERS, [
  103. {versions, ['tlsv1.3']},
  104. {ciphers, [
  105. "TLS_AES_256_GCM_SHA384",
  106. "TLS_AES_128_GCM_SHA256",
  107. "TLS_CHACHA20_POLY1305_SHA256",
  108. "TLS_AES_128_CCM_SHA256",
  109. "TLS_AES_128_CCM_8_SHA256"
  110. ]}
  111. ]).
  112. -define(TLS_OLD_CIPHERS, [
  113. {versions, ['tlsv1.1', 'tlsv1.2']},
  114. {ciphers, [
  115. "ECDHE-ECDSA-AES256-GCM-SHA384",
  116. "ECDHE-RSA-AES256-GCM-SHA384",
  117. "ECDHE-ECDSA-AES256-SHA384",
  118. "ECDHE-RSA-AES256-SHA384",
  119. "ECDHE-ECDSA-DES-CBC3-SHA",
  120. "ECDH-ECDSA-AES256-GCM-SHA384",
  121. "ECDH-RSA-AES256-GCM-SHA384",
  122. "ECDH-ECDSA-AES256-SHA384",
  123. "ECDH-RSA-AES256-SHA384",
  124. "DHE-DSS-AES256-GCM-SHA384",
  125. "DHE-DSS-AES256-SHA256",
  126. "AES256-GCM-SHA384",
  127. "AES256-SHA256",
  128. "ECDHE-ECDSA-AES128-GCM-SHA256",
  129. "ECDHE-RSA-AES128-GCM-SHA256",
  130. "ECDHE-ECDSA-AES128-SHA256",
  131. "ECDHE-RSA-AES128-SHA256",
  132. "ECDH-ECDSA-AES128-GCM-SHA256",
  133. "ECDH-RSA-AES128-GCM-SHA256",
  134. "ECDH-ECDSA-AES128-SHA256",
  135. "ECDH-RSA-AES128-SHA256",
  136. "DHE-DSS-AES128-GCM-SHA256",
  137. "DHE-DSS-AES128-SHA256",
  138. "AES128-GCM-SHA256",
  139. "AES128-SHA256",
  140. "ECDHE-ECDSA-AES256-SHA",
  141. "ECDHE-RSA-AES256-SHA",
  142. "DHE-DSS-AES256-SHA",
  143. "ECDH-ECDSA-AES256-SHA",
  144. "ECDH-RSA-AES256-SHA",
  145. "AES256-SHA",
  146. "ECDHE-ECDSA-AES128-SHA",
  147. "ECDHE-RSA-AES128-SHA",
  148. "DHE-DSS-AES128-SHA",
  149. "ECDH-ECDSA-AES128-SHA",
  150. "ECDH-RSA-AES128-SHA",
  151. "AES128-SHA"
  152. ]}
  153. ]).
  154. -define(DEFAULT_TCP_SERVER_CHECK_AVAIL_TIMEOUT, 1000).
  155. %%------------------------------------------------------------------------------
  156. %% APIs
  157. %%------------------------------------------------------------------------------
  158. all(Suite) ->
  159. lists:usort([
  160. F
  161. || {F, 1} <- Suite:module_info(exports),
  162. string:substr(atom_to_list(F), 1, 2) == "t_"
  163. ]).
  164. init_per_testcase(Module, TestCase, Config) ->
  165. case erlang:function_exported(Module, TestCase, 2) of
  166. true -> Module:TestCase(init, Config);
  167. false -> Config
  168. end.
  169. end_per_testcase(Module, TestCase, Config) ->
  170. case erlang:function_exported(Module, TestCase, 2) of
  171. true -> Module:TestCase('end', Config);
  172. false -> ok
  173. end,
  174. Config.
  175. %% set emqx app boot modules
  176. -spec boot_modules(all | list(atom())) -> ok.
  177. boot_modules(Mods) ->
  178. application:set_env(emqx, boot_modules, Mods).
  179. -spec start_apps(Apps :: apps()) -> ok.
  180. start_apps(Apps) ->
  181. %% to avoid keeping the `db_hostname' that is set when loading
  182. %% `system_monitor' application in `emqx_machine', and then it
  183. %% crashing when trying to connect.
  184. %% FIXME: add an `enable' option to sysmon_top and use that to
  185. %% decide whether to start it or not.
  186. DefaultHandler =
  187. fun(_) ->
  188. application:set_env(system_monitor, db_hostname, ""),
  189. ok
  190. end,
  191. start_apps(Apps, DefaultHandler, #{}).
  192. -spec start_apps(Apps :: apps(), Handler :: special_config_handler()) -> ok.
  193. start_apps(Apps, SpecAppConfig) when is_function(SpecAppConfig) ->
  194. start_apps(Apps, SpecAppConfig, #{}).
  195. -spec start_apps(Apps :: apps(), Handler :: special_config_handler(), map()) -> ok.
  196. start_apps(Apps, SpecAppConfig, Opts) when is_function(SpecAppConfig) ->
  197. %% Load all application code to beam vm first
  198. %% Because, minirest, ekka etc.. application will scan these modules
  199. lists:foreach(fun load/1, [emqx | Apps]),
  200. ok = start_ekka(),
  201. ok = emqx_ratelimiter_SUITE:load_conf(),
  202. lists:foreach(fun(App) -> start_app(App, SpecAppConfig, Opts) end, [emqx | Apps]).
  203. load(App) ->
  204. case application:load(App) of
  205. ok -> ok;
  206. {error, {already_loaded, _}} -> ok;
  207. {error, Reason} -> error({failed_to_load_app, App, Reason})
  208. end.
  209. render_and_load_app_config(App) ->
  210. render_and_load_app_config(App, #{}).
  211. render_and_load_app_config(App, Opts) ->
  212. load(App),
  213. Schema = app_schema(App),
  214. ConfFilePath = maps:get(conf_file_path, Opts, filename:join(["etc", app_conf_file(App)])),
  215. Conf = app_path(App, ConfFilePath),
  216. try
  217. do_render_app_config(App, Schema, Conf, Opts)
  218. catch
  219. throw:skip ->
  220. ok;
  221. throw:E:St ->
  222. %% turn throw into error
  223. error({Conf, E, St})
  224. end.
  225. do_render_app_config(App, Schema, ConfigFile, Opts) ->
  226. %% copy acl_conf must run before read_schema_configs
  227. copy_acl_conf(),
  228. Vars = mustache_vars(App, Opts),
  229. RenderedConfigFile = render_config_file(ConfigFile, Vars),
  230. read_schema_configs(Schema, RenderedConfigFile),
  231. force_set_config_file_paths(App, [RenderedConfigFile]),
  232. copy_certs(App, RenderedConfigFile),
  233. ok.
  234. start_app(App, SpecAppConfig) ->
  235. start_app(App, SpecAppConfig, #{}).
  236. start_app(App, SpecAppConfig, Opts) ->
  237. render_and_load_app_config(App, Opts),
  238. SpecAppConfig(App),
  239. case application:ensure_all_started(App) of
  240. {ok, _} ->
  241. ok = ensure_dashboard_listeners_started(App),
  242. ok = wait_for_app_processes(App),
  243. ok = perform_sanity_checks(App),
  244. ok;
  245. {error, Reason} ->
  246. error({failed_to_start_app, App, Reason})
  247. end.
  248. wait_for_app_processes(emqx_conf) ->
  249. %% emqx_conf app has a gen_server which
  250. %% initializes its state asynchronously
  251. gen_server:call(emqx_cluster_rpc, dummy),
  252. ok;
  253. wait_for_app_processes(_) ->
  254. ok.
  255. %% These are checks to detect inter-suite or inter-testcase flakiness
  256. %% early. For example, one suite might forget one application running
  257. %% and stop others, and then the `application:start/2' callback is
  258. %% never called again for this application.
  259. perform_sanity_checks(emqx_rule_engine) ->
  260. ensure_config_handler(emqx_rule_engine, [rule_engine, rules, '?']),
  261. ok;
  262. perform_sanity_checks(emqx_bridge) ->
  263. ensure_config_handler(emqx_bridge, [bridges]),
  264. ok;
  265. perform_sanity_checks(_App) ->
  266. ok.
  267. ensure_config_handler(Module, ConfigPath) ->
  268. #{handlers := Handlers} = emqx_config_handler:info(),
  269. case emqx_utils_maps:deep_get(ConfigPath, Handlers, not_found) of
  270. #{'$mod' := Module} -> ok;
  271. NotFound -> error({config_handler_missing, ConfigPath, Module, NotFound})
  272. end,
  273. ok.
  274. app_conf_file(emqx_conf) -> "emqx.conf.all";
  275. app_conf_file(App) -> atom_to_list(App) ++ ".conf".
  276. app_schema(App) ->
  277. Mod = list_to_atom(atom_to_list(App) ++ "_schema"),
  278. try
  279. true = is_list(Mod:roots()),
  280. Mod
  281. catch
  282. error:undef ->
  283. no_schema
  284. end.
  285. mustache_vars(App, Opts) ->
  286. ExtraMustacheVars = maps:get(extra_mustache_vars, Opts, #{}),
  287. Defaults = #{
  288. node_cookie => atom_to_list(erlang:get_cookie()),
  289. platform_data_dir => app_path(App, "data"),
  290. platform_etc_dir => app_path(App, "etc")
  291. },
  292. maps:merge(Defaults, ExtraMustacheVars).
  293. render_config_file(ConfigFile, Vars0) ->
  294. Temp =
  295. case file:read_file(ConfigFile) of
  296. {ok, T} -> T;
  297. {error, enoent} -> throw(skip);
  298. {error, Reason} -> error({failed_to_read_config_template, ConfigFile, Reason})
  299. end,
  300. Vars = [{atom_to_list(N), iolist_to_binary(V)} || {N, V} <- maps:to_list(Vars0)],
  301. Targ = bbmustache:render(Temp, Vars),
  302. NewName = ConfigFile ++ ".rendered",
  303. ok = file:write_file(NewName, Targ),
  304. NewName.
  305. read_schema_configs(no_schema, _ConfigFile) ->
  306. ok;
  307. read_schema_configs(Schema, ConfigFile) ->
  308. NewConfig = generate_config(Schema, ConfigFile),
  309. application:set_env(NewConfig).
  310. generate_config(SchemaModule, ConfigFile) when is_atom(SchemaModule) ->
  311. {ok, Conf0} = hocon:load(ConfigFile, #{format => richmap}),
  312. hocon_tconf:generate(SchemaModule, Conf0).
  313. -spec stop_apps(list()) -> ok.
  314. stop_apps(Apps) ->
  315. stop_apps(Apps, #{}).
  316. stop_apps(Apps, Opts) ->
  317. [application:stop(App) || App <- Apps ++ [emqx, ekka, mria, mnesia]],
  318. ok = mria_mnesia:delete_schema(),
  319. %% to avoid inter-suite flakiness
  320. application:unset_env(emqx, config_loader),
  321. application:unset_env(emqx, boot_modules),
  322. emqx_schema_hooks:erase_injections(),
  323. case Opts of
  324. #{erase_all_configs := false} ->
  325. %% FIXME: this means inter-suite or inter-test dependencies
  326. ok;
  327. _ ->
  328. emqx_config:erase_all()
  329. end,
  330. ok = emqx_config:delete_override_conf_files(),
  331. application:unset_env(emqx, local_override_conf_file),
  332. application:unset_env(emqx, cluster_override_conf_file),
  333. application:unset_env(emqx, cluster_hocon_file),
  334. application:unset_env(gen_rpc, port_discovery),
  335. ok.
  336. proj_root() ->
  337. filename:join(
  338. lists:takewhile(
  339. fun(X) -> iolist_to_binary(X) =/= <<"_build">> end,
  340. filename:split(app_path(emqx, "."))
  341. )
  342. ).
  343. %% backward compatible
  344. deps_path(App, RelativePath) -> app_path(App, RelativePath).
  345. app_path(App, RelativePath) ->
  346. Lib = code:lib_dir(App),
  347. safe_relative_path(filename:join([Lib, RelativePath])).
  348. safe_relative_path(Path) ->
  349. case filename:split(Path) of
  350. ["/" | T] ->
  351. T1 = do_safe_relative_path(filename:join(T)),
  352. filename:join(["/", T1]);
  353. _ ->
  354. do_safe_relative_path(Path)
  355. end.
  356. do_safe_relative_path(Path) ->
  357. case safe_relative_path_2(Path) of
  358. unsafe -> Path;
  359. OK -> OK
  360. end.
  361. -if(?OTP_RELEASE < 23).
  362. safe_relative_path_2(Path) ->
  363. filename:safe_relative_path(Path).
  364. -else.
  365. safe_relative_path_2(Path) ->
  366. {ok, Cwd} = file:get_cwd(),
  367. filelib:safe_relative_path(Path, Cwd).
  368. -endif.
  369. -spec reload(App :: atom(), SpecAppConfig :: special_config_handler()) -> ok.
  370. reload(App, SpecAppConfigHandler) ->
  371. application:stop(App),
  372. start_app(App, SpecAppConfigHandler, #{}),
  373. application:start(App).
  374. ensure_mnesia_stopped() ->
  375. mria:stop(),
  376. mria_mnesia:delete_schema().
  377. %% Help function to wait for Fun to yield 'true'.
  378. wait_for(Fn, Ln, F, Timeout) ->
  379. {Pid, Mref} = erlang:spawn_monitor(fun() -> wait_loop(F, catch_call(F)) end),
  380. wait_for_down(Fn, Ln, Timeout, Pid, Mref, false).
  381. wait_publishes(0, _Timeout) ->
  382. [];
  383. wait_publishes(Count, Timeout) ->
  384. receive
  385. {publish, Msg} ->
  386. [Msg | wait_publishes(Count - 1, Timeout)]
  387. after Timeout ->
  388. []
  389. end.
  390. flush() ->
  391. flush([]).
  392. flush(Msgs) ->
  393. receive
  394. M -> flush([M | Msgs])
  395. after 0 -> lists:reverse(Msgs)
  396. end.
  397. client_mtls() ->
  398. client_mtls(default).
  399. client_mtls(TLSVsn) ->
  400. ssl_verify_fun_allow_any_host() ++ client_certs() ++ ciphers(TLSVsn).
  401. %% Paths prepended to cert filenames
  402. client_certs() ->
  403. [{Key, app_path(emqx, FilePath)} || {Key, FilePath} <- ?MQTT_SSL_CLIENT_CERTS].
  404. client_ssl() ->
  405. client_ssl(default).
  406. client_ssl(TLSVsn) ->
  407. ciphers(TLSVsn) ++ [{reuse_sessions, true}].
  408. %% determined via config file defaults
  409. ciphers(default) -> [];
  410. ciphers('tlsv1.3') -> ?TLS_1_3_CIPHERS;
  411. ciphers(_OlderTLSVsn) -> ?TLS_OLD_CIPHERS.
  412. wait_mqtt_payload(Payload) ->
  413. receive
  414. {publish, #{payload := Payload}} ->
  415. ct:pal("OK - received msg: ~p~n", [Payload])
  416. after 1000 ->
  417. ct:fail({timeout, Payload, {msg_box, flush()}})
  418. end.
  419. not_wait_mqtt_payload(Payload) ->
  420. receive
  421. {publish, #{payload := Payload}} ->
  422. ct:fail({received, Payload})
  423. after 1000 ->
  424. ct:pal("OK - msg ~p is not received", [Payload])
  425. end.
  426. wait_for_down(Fn, Ln, Timeout, Pid, Mref, Kill) ->
  427. receive
  428. {'DOWN', Mref, process, Pid, normal} ->
  429. ok;
  430. {'DOWN', Mref, process, Pid, {unexpected, Result}} ->
  431. erlang:error({unexpected, Fn, Ln, Result});
  432. {'DOWN', Mref, process, Pid, {crashed, {C, E, S}}} ->
  433. erlang:raise(C, {Fn, Ln, E}, S)
  434. after Timeout ->
  435. case Kill of
  436. true ->
  437. erlang:demonitor(Mref, [flush]),
  438. erlang:exit(Pid, kill),
  439. erlang:error({Fn, Ln, timeout});
  440. false ->
  441. Pid ! stop,
  442. wait_for_down(Fn, Ln, Timeout, Pid, Mref, true)
  443. end
  444. end.
  445. wait_loop(_F, ok) ->
  446. exit(normal);
  447. wait_loop(F, LastRes) ->
  448. receive
  449. stop -> erlang:exit(LastRes)
  450. after 100 ->
  451. Res = catch_call(F),
  452. wait_loop(F, Res)
  453. end.
  454. catch_call(F) ->
  455. try
  456. case F() of
  457. true -> ok;
  458. Other -> {unexpected, Other}
  459. end
  460. catch
  461. C:E:S ->
  462. {crashed, {C, E, S}}
  463. end.
  464. force_set_config_file_paths(emqx_conf, [Path] = Paths) ->
  465. Bin = iolist_to_binary(io_lib:format("node.config_files = [~p]~n", [Path])),
  466. ok = file:write_file(Path, Bin, [append]),
  467. application:set_env(emqx, config_files, Paths);
  468. force_set_config_file_paths(emqx, Paths) ->
  469. %% we need init cluster conf, so we can save the cluster conf to the file
  470. application:set_env(emqx, local_override_conf_file, "local_override.conf"),
  471. application:set_env(emqx, cluster_override_conf_file, "cluster_override.conf"),
  472. application:set_env(emqx, cluster_conf_file, "cluster.hocon"),
  473. application:set_env(emqx, config_files, Paths);
  474. force_set_config_file_paths(_, _) ->
  475. ok.
  476. copy_certs(emqx_conf, Dest0) ->
  477. Dest = filename:dirname(Dest0),
  478. From = string:replace(Dest, "emqx_conf", "emqx"),
  479. os:cmd(["cp -rf ", From, "/certs ", Dest, "/"]),
  480. ok;
  481. copy_certs(_, _) ->
  482. ok.
  483. copy_acl_conf() ->
  484. Dest = filename:join([code:lib_dir(emqx), "etc/acl.conf"]),
  485. case code:lib_dir(emqx_auth) of
  486. {error, bad_name} ->
  487. (not filelib:is_regular(Dest)) andalso file:write_file(Dest, <<"">>);
  488. _ ->
  489. {ok, _} = file:copy(deps_path(emqx_auth, "etc/acl.conf"), Dest)
  490. end,
  491. ok.
  492. load_config(SchemaModule, Config) ->
  493. ConfigBin =
  494. case is_map(Config) of
  495. true -> emqx_utils_json:encode(Config);
  496. false -> Config
  497. end,
  498. ok = emqx_config:delete_override_conf_files(),
  499. ok = copy_acl_conf(),
  500. ok = emqx_config:init_load(SchemaModule, ConfigBin).
  501. -spec is_all_tcp_servers_available(Servers) -> Result when
  502. Servers :: [{Host, Port}],
  503. Host :: inet:socket_address() | inet:hostname(),
  504. Port :: inet:port_number(),
  505. Result :: boolean().
  506. is_all_tcp_servers_available(Servers) ->
  507. Fun =
  508. fun({Host, Port}) ->
  509. is_tcp_server_available(Host, Port)
  510. end,
  511. case lists:partition(Fun, Servers) of
  512. {_, []} ->
  513. true;
  514. {_, Unavail} ->
  515. ct:pal("Unavailable servers: ~p", [Unavail]),
  516. false
  517. end.
  518. -spec is_tcp_server_available(
  519. Host :: inet:socket_address() | inet:hostname(),
  520. Port :: inet:port_number()
  521. ) -> boolean.
  522. is_tcp_server_available(Host, Port) ->
  523. is_tcp_server_available(Host, Port, ?DEFAULT_TCP_SERVER_CHECK_AVAIL_TIMEOUT).
  524. -spec is_tcp_server_available(
  525. Host :: inet:socket_address() | inet:hostname(),
  526. Port :: inet:port_number(),
  527. Timeout :: integer()
  528. ) -> boolean.
  529. is_tcp_server_available(Host, Port, Timeout) ->
  530. case
  531. gen_tcp:connect(
  532. emqx_utils_conv:str(Host),
  533. emqx_utils_conv:int(Port),
  534. [],
  535. Timeout
  536. )
  537. of
  538. {ok, Socket} ->
  539. gen_tcp:close(Socket),
  540. true;
  541. {error, _} ->
  542. false
  543. end.
  544. start_ekka() ->
  545. try mnesia_hook:module_info() of
  546. _ -> ekka:start()
  547. catch
  548. _:_ ->
  549. %% Falling back to using Mnesia DB backend.
  550. application:set_env(mria, db_backend, mnesia),
  551. ekka:start()
  552. end.
  553. ensure_dashboard_listeners_started(emqx_dashboard) ->
  554. true = emqx_dashboard_listener:is_ready(infinity),
  555. ok;
  556. ensure_dashboard_listeners_started(_App) ->
  557. ok.
  558. -spec ensure_quic_listener(Name :: atom(), UdpPort :: inet:port_number()) -> ok.
  559. ensure_quic_listener(Name, UdpPort) ->
  560. ensure_quic_listener(Name, UdpPort, #{}).
  561. -spec ensure_quic_listener(Name :: atom(), UdpPort :: inet:port_number(), map()) -> ok.
  562. ensure_quic_listener(Name, UdpPort, ExtraSettings) ->
  563. application:ensure_all_started(quicer),
  564. Conf = #{
  565. acceptors => 16,
  566. bind => UdpPort,
  567. ciphers =>
  568. [
  569. "TLS_AES_256_GCM_SHA384",
  570. "TLS_AES_128_GCM_SHA256",
  571. "TLS_CHACHA20_POLY1305_SHA256"
  572. ],
  573. enable => true,
  574. idle_timeout => 15000,
  575. ssl_options => #{
  576. certfile => filename:join(code:lib_dir(emqx), "etc/certs/cert.pem"),
  577. keyfile => filename:join(code:lib_dir(emqx), "etc/certs/key.pem")
  578. },
  579. limiter => #{},
  580. max_connections => 1024000,
  581. mountpoint => <<>>,
  582. zone => default
  583. },
  584. Conf2 = maps:merge(Conf, ExtraSettings),
  585. emqx_config:put([listeners, quic, Name], Conf2),
  586. case emqx_listeners:start_listener(emqx_listeners:listener_id(quic, Name)) of
  587. ok -> ok;
  588. {error, {already_started, _Pid}} -> ok;
  589. Other -> throw(Other)
  590. end.
  591. %%
  592. %% Clusterisation and multi-node testing
  593. %%
  594. -type cluster_spec() :: [node_spec()].
  595. -type node_spec() :: role() | {role(), shortname()} | {role(), shortname(), node_opts()}.
  596. -type role() :: core | replicant.
  597. -type shortname() :: atom().
  598. -type nodename() :: atom().
  599. -type node_opts() :: #{
  600. %% Need to loaded apps. These apps will be loaded once the node started
  601. load_apps => list(),
  602. %% Need to started apps. It is the first arg passed to emqx_common_test_helpers:start_apps/2
  603. apps => list(),
  604. %% Extras app starting handler. It is the second arg passed to emqx_common_test_helpers:start_apps/2
  605. env_handler => fun((AppName :: atom()) -> term()),
  606. %% Application env preset before calling `emqx_common_test_helpers:start_apps/2`
  607. env => [{AppName :: atom(), Key :: atom(), Val :: term()}],
  608. %% Whether to execute `emqx_config:init_load(SchemaMod)`
  609. %% default: true
  610. load_schema => boolean(),
  611. %% Which node in the cluster to join to.
  612. %% default: first core node
  613. join_to => node(),
  614. %% If we want to exercise the scenario where a node joins an
  615. %% existing cluster where there has already been some
  616. %% configuration changes (via cluster rpc), then we need to enable
  617. %% autocluster so that the joining node will restart the
  618. %% `emqx_conf' app and correctly catch up the config.
  619. start_autocluster => boolean(),
  620. %% Eval by emqx_config:put/2
  621. conf => [{KeyPath :: list(), Val :: term()}],
  622. %% Fast option to config listener port
  623. %% default rule:
  624. %% - tcp: base_port
  625. %% - ssl: base_port + 1
  626. %% - ws : base_port + 3
  627. %% - wss: base_port + 4
  628. listener_ports => [{Type :: tcp | ssl | ws | wss, inet:port_number()}]
  629. }.
  630. -spec emqx_cluster(cluster_spec()) -> [{shortname(), node_opts()}].
  631. emqx_cluster(Specs) ->
  632. emqx_cluster(Specs, #{}).
  633. -spec emqx_cluster(cluster_spec(), node_opts()) -> [{shortname(), node_opts()}].
  634. emqx_cluster(Specs, CommonOpts) when is_list(CommonOpts) ->
  635. emqx_cluster(Specs, maps:from_list(CommonOpts));
  636. emqx_cluster(Specs0, CommonOpts) ->
  637. Specs1 = lists:zip(Specs0, lists:seq(1, length(Specs0))),
  638. Specs = expand_node_specs(Specs1, CommonOpts),
  639. %% Assign grpc ports
  640. GenRpcPorts = maps:from_list([
  641. {node_name(Name), {tcp, gen_rpc_port(base_port(Num))}}
  642. || {{_, Name, _}, Num} <- Specs
  643. ]),
  644. %% Set the default node of the cluster:
  645. CoreNodes = [node_name(Name) || {{core, Name, _}, _} <- Specs],
  646. JoinTo =
  647. case CoreNodes of
  648. [First | _] -> First;
  649. _ -> undefined
  650. end,
  651. NodeOpts = fun(Number) ->
  652. #{
  653. base_port => base_port(Number),
  654. env => [
  655. {mria, core_nodes, CoreNodes},
  656. {gen_rpc, client_config_per_node, {internal, GenRpcPorts}}
  657. ]
  658. }
  659. end,
  660. RoleOpts = fun
  661. (core) ->
  662. #{
  663. join_to => JoinTo,
  664. env => [
  665. {mria, node_role, core}
  666. ]
  667. };
  668. (replicant) ->
  669. #{
  670. env => [
  671. {mria, node_role, replicant},
  672. {ekka, cluster_discovery, {static, [{seeds, CoreNodes}]}}
  673. ]
  674. }
  675. end,
  676. [
  677. {Name, merge_opts(merge_opts(NodeOpts(Number), RoleOpts(Role)), Opts)}
  678. || {{Role, Name, Opts}, Number} <- Specs
  679. ].
  680. %% Lower level starting API
  681. -spec start_peer(shortname(), node_opts()) -> nodename().
  682. start_peer(Name, Opts) when is_list(Opts) ->
  683. start_peer(Name, maps:from_list(Opts));
  684. start_peer(Name, Opts) when is_map(Opts) ->
  685. Node = node_name(Name),
  686. Cookie = atom_to_list(erlang:get_cookie()),
  687. PrivDataDir = maps:get(priv_data_dir, Opts, "/tmp"),
  688. NodeDataDir = filename:join([
  689. PrivDataDir,
  690. Node,
  691. integer_to_list(erlang:unique_integer())
  692. ]),
  693. DoStart =
  694. fun() ->
  695. ct:pal("~p: node data dir: ~s", [Node, NodeDataDir]),
  696. Envs = [
  697. {"HOCON_ENV_OVERRIDE_PREFIX", "EMQX_"},
  698. {"EMQX_NODE__COOKIE", Cookie},
  699. {"EMQX_NODE__DATA_DIR", NodeDataDir}
  700. ],
  701. emqx_cth_peer:start(Node, erl_flags(), Envs)
  702. end,
  703. case DoStart() of
  704. {ok, _} ->
  705. ok;
  706. {error, started_not_connected, _} ->
  707. ok;
  708. Other ->
  709. throw(Other)
  710. end,
  711. pong = net_adm:ping(Node),
  712. ok = snabbkaffe:forward_trace(Node),
  713. setup_node(Node, Opts),
  714. Node.
  715. %% Node stopping
  716. stop_peer(Node0) ->
  717. Node = node_name(Node0),
  718. emqx_cth_peer:stop(Node).
  719. %% EPMD starting
  720. start_epmd() ->
  721. [] = os:cmd("\"" ++ epmd_path() ++ "\" -daemon"),
  722. ok.
  723. epmd_path() ->
  724. case os:find_executable("epmd") of
  725. false ->
  726. ct:pal(critical, "Could not find epmd.~n"),
  727. exit(epmd_not_found);
  728. GlobalEpmd ->
  729. GlobalEpmd
  730. end.
  731. %% Node initialization
  732. -spec setup_node(nodename(), node_opts()) -> ok.
  733. setup_node(Node, Opts) when is_list(Opts) ->
  734. setup_node(Node, maps:from_list(Opts));
  735. setup_node(Node, Opts) when is_map(Opts) ->
  736. %% Default base port is selected upon Node from 1100 to 65530 with step 10
  737. BasePort = maps:get(base_port, Opts, 1100 + erlang:phash2(Node, 6553 - 110) * 10),
  738. Apps = maps:get(apps, Opts, []),
  739. StartApps = maps:get(start_apps, Opts, true),
  740. JoinTo = maps:get(join_to, Opts, undefined),
  741. EnvHandler = maps:get(env_handler, Opts, fun(_) -> ok end),
  742. ConfigureGenRpc = maps:get(configure_gen_rpc, Opts, true),
  743. LoadSchema = maps:get(load_schema, Opts, true),
  744. SchemaMod = maps:get(schema_mod, Opts, emqx_schema),
  745. LoadApps = maps:get(load_apps, Opts, Apps),
  746. Env = maps:get(env, Opts, []),
  747. Conf = maps:get(conf, Opts, []),
  748. ListenerPorts = maps:get(listener_ports, Opts, [
  749. {Type, listener_port(BasePort, Type)}
  750. || Type <- [tcp, ssl, ws, wss]
  751. ]),
  752. %% we need a fresh data dir for each peer node to avoid unintended
  753. %% successes due to sharing of data in the cluster.
  754. PrivDataDir = maps:get(priv_data_dir, Opts, "/tmp"),
  755. %% If we want to exercise the scenario where a node joins an
  756. %% existing cluster where there has already been some
  757. %% configuration changes (via cluster rpc), then we need to enable
  758. %% autocluster so that the joining node will restart the
  759. %% `emqx_conf' app and correctly catch up the config.
  760. StartAutocluster = maps:get(start_autocluster, Opts, false),
  761. ct:pal(
  762. "setting up node ~p:\n ~p",
  763. [
  764. Node,
  765. #{
  766. start_autocluster => StartAutocluster,
  767. load_apps => LoadApps,
  768. apps => Apps,
  769. env => Env,
  770. join_to => JoinTo,
  771. start_apps => StartApps
  772. }
  773. ]
  774. ),
  775. %% Load env before doing anything to avoid overriding
  776. [ok = erpc:call(Node, ?MODULE, load, [App]) || App <- [gen_rpc, ekka, mria, emqx | LoadApps]],
  777. %% Ensure a clean mnesia directory for each run to avoid
  778. %% inter-test flakiness.
  779. MnesiaDataDir = filename:join([
  780. PrivDataDir,
  781. Node,
  782. integer_to_list(erlang:unique_integer()),
  783. "mnesia"
  784. ]),
  785. case erpc:call(Node, application, get_env, [mnesia, dir, undefined]) of
  786. undefined ->
  787. ct:pal("~p: setting mnesia dir: ~p", [Node, MnesiaDataDir]),
  788. erpc:call(Node, application, set_env, [mnesia, dir, MnesiaDataDir]);
  789. PreviousMnesiaDir ->
  790. ct:pal("~p: mnesia dir already set: ~p", [Node, PreviousMnesiaDir]),
  791. ok
  792. end,
  793. %% Needs to be set explicitly because ekka:start() (which calls `gen`) is called without Handler
  794. %% in emqx_common_test_helpers:start_apps(...)
  795. ConfigureGenRpc andalso
  796. begin
  797. ok = rpc:call(Node, application, set_env, [
  798. gen_rpc, tcp_server_port, gen_rpc_port(BasePort)
  799. ]),
  800. ok = rpc:call(Node, application, set_env, [gen_rpc, port_discovery, manual])
  801. end,
  802. %% Setting env before starting any applications
  803. set_envs(Node, Env),
  804. NodeDataDir = filename:join([
  805. PrivDataDir,
  806. node(),
  807. integer_to_list(erlang:unique_integer())
  808. ]),
  809. %% Here we start the apps
  810. EnvHandlerForRpc =
  811. fun(App) ->
  812. %% We load configuration, and than set the special environment variable
  813. %% which says that emqx shouldn't load configuration at startup
  814. %% Otherwise, configuration gets loaded and all preset env in EnvHandler is lost
  815. LoadSchema andalso
  816. begin
  817. %% to avoid sharing data between executions and/or
  818. %% nodes. these variables might not be in the
  819. %% config file (e.g.: emqx_enterprise_schema).
  820. Cookie = atom_to_list(erlang:get_cookie()),
  821. set_env_once("EMQX_NODE__DATA_DIR", NodeDataDir),
  822. set_env_once("EMQX_NODE__COOKIE", Cookie),
  823. emqx_config:init_load(SchemaMod),
  824. emqx_app:set_config_loader(emqx_conf)
  825. end,
  826. %% Need to set this otherwise listeners will conflict between each other
  827. [
  828. ok = emqx_config:put([listeners, Type, default, bind], {
  829. {127, 0, 0, 1}, Port
  830. })
  831. || {Type, Port} <- ListenerPorts
  832. ],
  833. [ok = emqx_config:put(KeyPath, Value) || {KeyPath, Value} <- Conf],
  834. ok = EnvHandler(App),
  835. ok
  836. end,
  837. StartApps andalso
  838. begin
  839. ok = rpc:call(Node, emqx_common_test_helpers, start_apps, [Apps, EnvHandlerForRpc])
  840. end,
  841. %% Join the cluster if JoinTo is specified
  842. case JoinTo of
  843. undefined ->
  844. ok;
  845. _ ->
  846. StartAutocluster andalso
  847. begin
  848. %% Note: we need to re-set the env because
  849. %% starting the apps apparently make some of them
  850. %% to be lost... This is particularly useful for
  851. %% setting extra apps to be restarted after
  852. %% joining.
  853. set_envs(Node, Env),
  854. ok = erpc:call(Node, emqx_machine_boot, start_autocluster, [])
  855. end,
  856. case rpc:call(Node, ekka, join, [JoinTo]) of
  857. ok ->
  858. ok;
  859. ignore ->
  860. ok;
  861. Err ->
  862. stop_peer(Node),
  863. error({failed_to_join_cluster, #{node => Node, error => Err}})
  864. end
  865. end,
  866. ok.
  867. %% Helpers
  868. set_env_once(Var, Value) ->
  869. case os:getenv(Var) of
  870. false ->
  871. os:putenv(Var, Value);
  872. _OldValue ->
  873. ok
  874. end,
  875. ok.
  876. node_name(Name) ->
  877. case string:tokens(atom_to_list(Name), "@") of
  878. [_Name, _Host] ->
  879. %% the name already has a @
  880. Name;
  881. _ ->
  882. list_to_atom(atom_to_list(Name) ++ "@" ++ host())
  883. end.
  884. gen_node_name(Num) ->
  885. list_to_atom("autocluster_node" ++ integer_to_list(Num)).
  886. host() ->
  887. [_, Host] = string:tokens(atom_to_list(node()), "@"),
  888. Host.
  889. merge_opts(Opts1, Opts2) ->
  890. maps:merge_with(
  891. fun
  892. (env, Env1, Env2) -> lists:usort(Env2 ++ Env1);
  893. (conf, Conf1, Conf2) -> lists:usort(Conf2 ++ Conf1);
  894. (apps, Apps1, Apps2) -> lists:usort(Apps2 ++ Apps1);
  895. (load_apps, Apps1, Apps2) -> lists:usort(Apps2 ++ Apps1);
  896. (_Option, _Old, Value) -> Value
  897. end,
  898. Opts1,
  899. Opts2
  900. ).
  901. set_envs(Node, Env) ->
  902. lists:foreach(
  903. fun({Application, Key, Value}) ->
  904. ok = rpc:call(Node, application, set_env, [Application, Key, Value])
  905. end,
  906. Env
  907. ).
  908. erl_flags() ->
  909. %% One core
  910. ["+S", "1:1"] ++ ebin_path().
  911. ebin_path() ->
  912. ["-pa" | lists:filter(fun is_lib/1, code:get_path())].
  913. is_lib(Path) ->
  914. string:prefix(Path, code:lib_dir()) =:= nomatch andalso
  915. string:str(Path, "_build/default/plugins") =:= 0.
  916. %% Ports
  917. base_port(Number) ->
  918. 10000 + Number * 100.
  919. gen_rpc_port(BasePort) ->
  920. BasePort - 1.
  921. listener_port(Opts, Type) when is_map(Opts) ->
  922. BasePort = maps:get(base_port, Opts),
  923. listener_port(BasePort, Type);
  924. listener_port(BasePort, tcp) ->
  925. BasePort;
  926. listener_port(BasePort, ssl) ->
  927. BasePort + 1;
  928. listener_port(BasePort, quic) ->
  929. BasePort + 2;
  930. listener_port(BasePort, ws) ->
  931. BasePort + 3;
  932. listener_port(BasePort, wss) ->
  933. BasePort + 4.
  934. %% Autocluster helpers
  935. expand_node_specs(Specs, CommonOpts) ->
  936. lists:map(
  937. fun({Spec, Num}) ->
  938. {
  939. case Spec of
  940. core ->
  941. {core, gen_node_name(Num), CommonOpts};
  942. replicant ->
  943. {replicant, gen_node_name(Num), CommonOpts};
  944. {Role, Name} when is_atom(Name) ->
  945. {Role, Name, CommonOpts};
  946. {Role, Opts} when is_list(Opts) ->
  947. Opts1 = maps:from_list(Opts),
  948. {Role, gen_node_name(Num), merge_opts(CommonOpts, Opts1)};
  949. {Role, Name, Opts} when is_list(Opts) ->
  950. Opts1 = maps:from_list(Opts),
  951. {Role, Name, merge_opts(CommonOpts, Opts1)};
  952. {Role, Opts} ->
  953. {Role, gen_node_name(Num), merge_opts(CommonOpts, Opts)};
  954. {Role, Name, Opts} ->
  955. {Role, Name, merge_opts(CommonOpts, Opts)}
  956. end,
  957. Num
  958. }
  959. end,
  960. Specs
  961. ).
  962. %% Useful when iterating on the tests in a loop, to get rid of all the garbaged printed
  963. %% before the test itself beings.
  964. %% Only actually does anything if the environment variable `CLEAR_SCREEN' is set to `true'
  965. %% and only clears the screen the screen the first time it's encountered, so it's harmless
  966. %% otherwise.
  967. clear_screen() ->
  968. Key = {?MODULE, clear_screen},
  969. case {os:getenv("CLEAR_SCREEN"), persistent_term:get(Key, false)} of
  970. {"true", false} ->
  971. io:format(standard_io, "\033[H\033[2J", []),
  972. io:format(standard_error, "\033[H\033[2J", []),
  973. io:format(standard_io, "\033[H\033[3J", []),
  974. io:format(standard_error, "\033[H\033[3J", []),
  975. persistent_term:put(Key, true),
  976. ok;
  977. _ ->
  978. ok
  979. end.
  980. with_mock(Mod, FnName, MockedFn, Fun) ->
  981. ok = meck:new(Mod, [non_strict, no_link, no_history, passthrough]),
  982. ok = meck:expect(Mod, FnName, MockedFn),
  983. try
  984. Fun()
  985. after
  986. ok = meck:unload(Mod)
  987. end.
  988. %%-------------------------------------------------------------------------------
  989. %% Toxiproxy utils
  990. %%-------------------------------------------------------------------------------
  991. reset_proxy(ProxyHost, ProxyPort) ->
  992. Url = "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/reset",
  993. Body = <<>>,
  994. {ok, {{_, 204, _}, _, _}} = httpc:request(
  995. post,
  996. {Url, [], "application/json", Body},
  997. [],
  998. [{body_format, binary}]
  999. ).
  1000. with_failure(FailureType, Name, ProxyHost, ProxyPort, Fun) ->
  1001. enable_failure(FailureType, Name, ProxyHost, ProxyPort),
  1002. try
  1003. Fun()
  1004. after
  1005. heal_failure(FailureType, Name, ProxyHost, ProxyPort)
  1006. end.
  1007. enable_failure(FailureType, Name, ProxyHost, ProxyPort) ->
  1008. case FailureType of
  1009. down -> switch_proxy(off, Name, ProxyHost, ProxyPort);
  1010. timeout -> timeout_proxy(on, Name, ProxyHost, ProxyPort);
  1011. latency_up -> latency_up_proxy(on, Name, ProxyHost, ProxyPort)
  1012. end.
  1013. heal_failure(FailureType, Name, ProxyHost, ProxyPort) ->
  1014. case FailureType of
  1015. down -> switch_proxy(on, Name, ProxyHost, ProxyPort);
  1016. timeout -> timeout_proxy(off, Name, ProxyHost, ProxyPort);
  1017. latency_up -> latency_up_proxy(off, Name, ProxyHost, ProxyPort)
  1018. end.
  1019. switch_proxy(Switch, Name, ProxyHost, ProxyPort) ->
  1020. Url = "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/proxies/" ++ Name,
  1021. Body =
  1022. case Switch of
  1023. off -> #{<<"enabled">> => false};
  1024. on -> #{<<"enabled">> => true}
  1025. end,
  1026. BodyBin = emqx_utils_json:encode(Body),
  1027. {ok, {{_, 200, _}, _, _}} = httpc:request(
  1028. post,
  1029. {Url, [], "application/json", BodyBin},
  1030. [],
  1031. [{body_format, binary}]
  1032. ).
  1033. timeout_proxy(on, Name, ProxyHost, ProxyPort) ->
  1034. Url =
  1035. "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/proxies/" ++ Name ++
  1036. "/toxics",
  1037. NameBin = list_to_binary(Name),
  1038. Body = #{
  1039. <<"name">> => <<NameBin/binary, "_timeout">>,
  1040. <<"type">> => <<"timeout">>,
  1041. <<"stream">> => <<"upstream">>,
  1042. <<"toxicity">> => 1.0,
  1043. <<"attributes">> => #{<<"timeout">> => 0}
  1044. },
  1045. BodyBin = emqx_utils_json:encode(Body),
  1046. {ok, {{_, 200, _}, _, _}} = httpc:request(
  1047. post,
  1048. {Url, [], "application/json", BodyBin},
  1049. [],
  1050. [{body_format, binary}]
  1051. );
  1052. timeout_proxy(off, Name, ProxyHost, ProxyPort) ->
  1053. ToxicName = Name ++ "_timeout",
  1054. Url =
  1055. "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/proxies/" ++ Name ++
  1056. "/toxics/" ++ ToxicName,
  1057. Body = <<>>,
  1058. {ok, {{_, 204, _}, _, _}} = httpc:request(
  1059. delete,
  1060. {Url, [], "application/json", Body},
  1061. [],
  1062. [{body_format, binary}]
  1063. ).
  1064. latency_up_proxy(on, Name, ProxyHost, ProxyPort) ->
  1065. Url =
  1066. "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/proxies/" ++ Name ++
  1067. "/toxics",
  1068. NameBin = list_to_binary(Name),
  1069. Body = #{
  1070. <<"name">> => <<NameBin/binary, "_latency_up">>,
  1071. <<"type">> => <<"latency">>,
  1072. <<"stream">> => <<"upstream">>,
  1073. <<"toxicity">> => 1.0,
  1074. <<"attributes">> => #{
  1075. <<"latency">> => 20_000,
  1076. <<"jitter">> => 3_000
  1077. }
  1078. },
  1079. BodyBin = emqx_utils_json:encode(Body),
  1080. {ok, {{_, 200, _}, _, _}} = httpc:request(
  1081. post,
  1082. {Url, [], "application/json", BodyBin},
  1083. [],
  1084. [{body_format, binary}]
  1085. );
  1086. latency_up_proxy(off, Name, ProxyHost, ProxyPort) ->
  1087. ToxicName = Name ++ "_latency_up",
  1088. Url =
  1089. "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/proxies/" ++ Name ++
  1090. "/toxics/" ++ ToxicName,
  1091. Body = <<>>,
  1092. {ok, {{_, 204, _}, _, _}} = httpc:request(
  1093. delete,
  1094. {Url, [], "application/json", Body},
  1095. [],
  1096. [{body_format, binary}]
  1097. ).
  1098. %%-------------------------------------------------------------------------------
  1099. %% TLS certs
  1100. %%-------------------------------------------------------------------------------
  1101. gen_ca(Path, Name) ->
  1102. %% Generate ca.pem and ca.key which will be used to generate certs
  1103. %% for hosts server and clients
  1104. ECKeyFile = filename(Path, "~s-ec.key", [Name]),
  1105. filelib:ensure_dir(ECKeyFile),
  1106. os:cmd("openssl ecparam -name secp256r1 > " ++ ECKeyFile),
  1107. Cmd = lists:flatten(
  1108. io_lib:format(
  1109. "openssl req -new -x509 -nodes "
  1110. "-newkey ec:~s "
  1111. "-keyout ~s -out ~s -days 3650 "
  1112. "-subj \"/C=SE/O=Internet Widgits Pty Ltd CA\"",
  1113. [
  1114. ECKeyFile,
  1115. ca_key_name(Path, Name),
  1116. ca_cert_name(Path, Name)
  1117. ]
  1118. )
  1119. ),
  1120. os:cmd(Cmd).
  1121. ca_cert_name(Path, Name) ->
  1122. filename(Path, "~s.pem", [Name]).
  1123. ca_key_name(Path, Name) ->
  1124. filename(Path, "~s.key", [Name]).
  1125. gen_host_cert(H, CaName, Path) ->
  1126. gen_host_cert(H, CaName, Path, #{}).
  1127. gen_host_cert(H, CaName, Path, Opts) ->
  1128. ECKeyFile = filename(Path, "~s-ec.key", [CaName]),
  1129. CN = str(H),
  1130. HKey = filename(Path, "~s.key", [H]),
  1131. HCSR = filename(Path, "~s.csr", [H]),
  1132. HPEM = filename(Path, "~s.pem", [H]),
  1133. HEXT = filename(Path, "~s.extfile", [H]),
  1134. PasswordArg =
  1135. case maps:get(password, Opts, undefined) of
  1136. undefined ->
  1137. " -nodes ";
  1138. Password ->
  1139. io_lib:format(" -passout pass:'~s' ", [Password])
  1140. end,
  1141. CSR_Cmd =
  1142. lists:flatten(
  1143. io_lib:format(
  1144. "openssl req -new ~s -newkey ec:~s "
  1145. "-keyout ~s -out ~s "
  1146. "-addext \"subjectAltName=DNS:~s\" "
  1147. "-addext keyUsage=digitalSignature,keyAgreement "
  1148. "-subj \"/C=SE/O=Internet Widgits Pty Ltd/CN=~s\"",
  1149. [PasswordArg, ECKeyFile, HKey, HCSR, CN, CN]
  1150. )
  1151. ),
  1152. create_file(
  1153. HEXT,
  1154. "keyUsage=digitalSignature,keyAgreement\n"
  1155. "subjectAltName=DNS:~s\n",
  1156. [CN]
  1157. ),
  1158. CERT_Cmd =
  1159. lists:flatten(
  1160. io_lib:format(
  1161. "openssl x509 -req "
  1162. "-extfile ~s "
  1163. "-in ~s -CA ~s -CAkey ~s -CAcreateserial "
  1164. "-out ~s -days 500",
  1165. [
  1166. HEXT,
  1167. HCSR,
  1168. ca_cert_name(Path, CaName),
  1169. ca_key_name(Path, CaName),
  1170. HPEM
  1171. ]
  1172. )
  1173. ),
  1174. ct:pal(os:cmd(CSR_Cmd)),
  1175. ct:pal(os:cmd(CERT_Cmd)),
  1176. file:delete(HEXT).
  1177. filename(Path, F, A) ->
  1178. filename:join(Path, str(io_lib:format(F, A))).
  1179. str(Arg) ->
  1180. binary_to_list(iolist_to_binary(Arg)).
  1181. create_file(Filename, Fmt, Args) ->
  1182. filelib:ensure_dir(Filename),
  1183. {ok, F} = file:open(Filename, [write]),
  1184. try
  1185. io:format(F, Fmt, Args)
  1186. after
  1187. file:close(F)
  1188. end,
  1189. ok.
  1190. %%-------------------------------------------------------------------------------
  1191. %% Testcase teardown utilities
  1192. %%-------------------------------------------------------------------------------
  1193. %% stop the janitor gracefully to ensure proper cleanup order and less
  1194. %% noise in the logs.
  1195. call_janitor() ->
  1196. call_janitor(15_000).
  1197. call_janitor(Timeout) ->
  1198. Janitor = get_or_spawn_janitor(),
  1199. ok = emqx_test_janitor:stop(Janitor, Timeout),
  1200. erase({?MODULE, janitor_proc}),
  1201. ok.
  1202. get_or_spawn_janitor() ->
  1203. case get({?MODULE, janitor_proc}) of
  1204. undefined ->
  1205. {ok, Janitor} = emqx_test_janitor:start_link(),
  1206. put({?MODULE, janitor_proc}, Janitor),
  1207. Janitor;
  1208. Janitor ->
  1209. Janitor
  1210. end.
  1211. on_exit(Fun) ->
  1212. Janitor = get_or_spawn_janitor(),
  1213. ok = emqx_test_janitor:push_on_exit_callback(Janitor, Fun).
  1214. %%-------------------------------------------------------------------------------
  1215. %% Select a free transport port from the OS
  1216. %%-------------------------------------------------------------------------------
  1217. %% @doc get unused port from OS
  1218. -spec select_free_port(tcp | udp | ssl | quic) -> inets:port_number().
  1219. select_free_port(tcp) ->
  1220. select_free_port(gen_tcp, listen);
  1221. select_free_port(udp) ->
  1222. select_free_port(gen_udp, open);
  1223. select_free_port(ssl) ->
  1224. select_free_port(tcp);
  1225. select_free_port(quic) ->
  1226. select_free_port(udp).
  1227. select_free_port(GenModule, Fun) when
  1228. GenModule == gen_tcp orelse
  1229. GenModule == gen_udp
  1230. ->
  1231. {ok, S} = GenModule:Fun(0, [{reuseaddr, true}]),
  1232. {ok, Port} = inet:port(S),
  1233. ok = GenModule:close(S),
  1234. case os:type() of
  1235. {unix, darwin} ->
  1236. %% in MacOS, still get address_in_use after close port
  1237. timer:sleep(500);
  1238. _ ->
  1239. skip
  1240. end,
  1241. ct:pal("Select free OS port: ~p", [Port]),
  1242. Port.
  1243. %% Generate ct sub-groups from test-case's 'matrix' clause
  1244. %% NOTE: the test cases must have a root group name which
  1245. %% is unkonwn to this API.
  1246. %%
  1247. %% e.g.
  1248. %% all() -> [{group, g1}].
  1249. %%
  1250. %% groups() ->
  1251. %% emqx_common_test_helpers:groups(?MODULE, [case1, case2]).
  1252. %%
  1253. %% case1(matrix) ->
  1254. %% {g1, [[tcp, no_auth],
  1255. %% [ssl, no_auth],
  1256. %% [ssl, basic_auth]
  1257. %% ]};
  1258. %%
  1259. %% case2(matrix) ->
  1260. %% {g1, ...}
  1261. %% ...
  1262. %%
  1263. %% Return:
  1264. %%
  1265. %% [{g1, [],
  1266. %% [ {tcp, [], [{no_auth, [], [case1, case2]}
  1267. %% ]},
  1268. %% {ssl, [], [{no_auth, [], [case1, case2]},
  1269. %% {basic_auth, [], [case1, case2]}
  1270. %% ]}
  1271. %% ]
  1272. %% }
  1273. %% ]
  1274. matrix_to_groups(Module, Cases) ->
  1275. lists:foldr(
  1276. fun(Case, Acc) ->
  1277. add_case_matrix(Module, Case, Acc)
  1278. end,
  1279. [],
  1280. Cases
  1281. ).
  1282. add_case_matrix(Module, TestCase, Acc0) ->
  1283. {MaybeRootGroup, Matrix} =
  1284. case Module:TestCase(matrix) of
  1285. {RootGroup0, Matrix0} ->
  1286. {RootGroup0, Matrix0};
  1287. Matrix0 ->
  1288. {undefined, Matrix0}
  1289. end,
  1290. lists:foldr(
  1291. fun(Row, Acc) ->
  1292. case MaybeRootGroup of
  1293. undefined ->
  1294. add_group(Row, Acc, TestCase);
  1295. RootGroup ->
  1296. add_group([RootGroup | Row], Acc, TestCase)
  1297. end
  1298. end,
  1299. Acc0,
  1300. Matrix
  1301. ).
  1302. add_group([], Acc, TestCase) ->
  1303. case lists:member(TestCase, Acc) of
  1304. true ->
  1305. Acc;
  1306. false ->
  1307. [TestCase | Acc]
  1308. end;
  1309. add_group([Name | More], Acc, TestCases) ->
  1310. case lists:keyfind(Name, 1, Acc) of
  1311. false ->
  1312. [{Name, [], add_group(More, [], TestCases)} | Acc];
  1313. {Name, [], SubGroup} ->
  1314. New = {Name, [], add_group(More, SubGroup, TestCases)},
  1315. lists:keystore(Name, 1, Acc, New)
  1316. end.
  1317. group_path(Config) ->
  1318. try
  1319. Current = proplists:get_value(tc_group_properties, Config),
  1320. NameF = fun(Props) ->
  1321. {name, Name} = lists:keyfind(name, 1, Props),
  1322. Name
  1323. end,
  1324. Stack = proplists:get_value(tc_group_path, Config),
  1325. lists:reverse(lists:map(NameF, [Current | Stack]))
  1326. catch
  1327. _:_ ->
  1328. []
  1329. end.
  1330. %% almost verify_none equivalent, but only ignores 'hostname_check_failed'
  1331. ssl_verify_fun_allow_any_host_impl(_Cert, Event, State) ->
  1332. case Event of
  1333. valid ->
  1334. {valid, State};
  1335. valid_peer ->
  1336. {valid, State};
  1337. {bad_cert, hostname_check_failed} ->
  1338. {valid, State};
  1339. {bad_cert, _} ->
  1340. {fail, Event};
  1341. {extension, _} ->
  1342. {unknown, State}
  1343. end.
  1344. ssl_verify_fun_allow_any_host() ->
  1345. [
  1346. {verify, verify_peer},
  1347. {verify_fun, {fun ?MODULE:ssl_verify_fun_allow_any_host_impl/3, _State = #{}}}
  1348. ].