emqx_plugins_SUITE.erl 37 KB


  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2019-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_plugins_SUITE).
  17. -compile(export_all).
  18. -compile(nowarn_export_all).
  19. -include_lib("eunit/include/eunit.hrl").
  20. -include_lib("common_test/include/ct.hrl").
  21. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  22. -define(EMQX_PLUGIN_APP_NAME, my_emqx_plugin).
  23. -define(EMQX_PLUGIN_TEMPLATE_RELEASE_NAME, atom_to_list(?EMQX_PLUGIN_APP_NAME)).
  24. -define(EMQX_PLUGIN_TEMPLATE_URL,
  25. "https://github.com/emqx/emqx-plugin-template/releases/download/"
  26. ).
  27. -define(EMQX_PLUGIN_TEMPLATE_VSN, "5.1.0").
  28. -define(EMQX_PLUGIN_TEMPLATE_TAG, "5.1.0").
  29. -define(EMQX_PLUGIN_TEMPLATES_LEGACY, [
  30. #{
  31. vsn => "5.0.0",
  32. tag => "5.0.0",
  33. release_name => "emqx_plugin_template",
  34. app_name => emqx_plugin_template
  35. }
  36. ]).
  37. -define(EMQX_ELIXIR_PLUGIN_TEMPLATE_RELEASE_NAME, "elixir_plugin_template").
  38. -define(EMQX_ELIXIR_PLUGIN_TEMPLATE_URL,
  39. "https://github.com/emqx/emqx-elixir-plugin/releases/download/"
  40. ).
  41. -define(EMQX_ELIXIR_PLUGIN_TEMPLATE_VSN, "0.1.0").
  42. -define(EMQX_ELIXIR_PLUGIN_TEMPLATE_TAG, "0.1.0-2").
  43. -define(PACKAGE_SUFFIX, ".tar.gz").
  44. -define(ON(NODE, BODY), erpc:call(NODE, fun() -> BODY end)).
  45. all() ->
  46. [
  47. {group, copy_plugin},
  48. {group, create_tar_copy_plugin},
  49. emqx_common_test_helpers:all(?MODULE)
  50. ].
  51. groups() ->
  52. [
  53. {copy_plugin, [sequence], [
  54. group_t_copy_plugin_to_a_new_node,
  55. group_t_copy_plugin_to_a_new_node_single_node,
  56. group_t_cluster_leave
  57. ]},
  58. {create_tar_copy_plugin, [sequence], [group_t_copy_plugin_to_a_new_node]}
  59. ].
  60. init_per_group(copy_plugin, Config) ->
  61. Config;
  62. init_per_group(create_tar_copy_plugin, Config) ->
  63. [{remove_tar, true} | Config].
  64. end_per_group(_Group, _Config) ->
  65. ok.
  66. init_per_suite(Config) ->
  67. WorkDir = emqx_cth_suite:work_dir(Config),
  68. InstallDir = filename:join([WorkDir, "plugins"]),
  69. Apps = emqx_cth_suite:start(
  70. [
  71. emqx_conf,
  72. emqx_ctl,
  73. {emqx_plugins, #{config => #{plugins => #{install_dir => InstallDir}}}}
  74. ],
  75. #{work_dir => WorkDir}
  76. ),
  77. ok = filelib:ensure_path(InstallDir),
  78. [{suite_apps, Apps}, {install_dir, InstallDir} | Config].
  79. end_per_suite(Config) ->
  80. ok = emqx_cth_suite:stop(?config(suite_apps, Config)).
  81. init_per_testcase(TestCase, Config) ->
  82. emqx_plugins:put_configured([]),
  83. lists:foreach(
  84. fun(#{<<"name">> := Name, <<"rel_vsn">> := Vsn}) ->
  85. emqx_plugins:purge(bin([Name, "-", Vsn]))
  86. end,
  87. emqx_plugins:list()
  88. ),
  89. ?MODULE:TestCase({init, Config}).
  90. end_per_testcase(TestCase, Config) ->
  91. emqx_plugins:put_configured([]),
  92. ?MODULE:TestCase({'end', Config}).
  93. get_demo_plugin_package() ->
  94. get_demo_plugin_package(emqx_plugins:install_dir()).
  95. get_demo_plugin_package(
  96. #{
  97. release_name := ReleaseName,
  98. git_url := GitUrl,
  99. vsn := PluginVsn,
  100. tag := ReleaseTag,
  101. shdir := WorkDir
  102. } = Opts
  103. ) ->
  104. TargetName = lists:flatten([ReleaseName, "-", PluginVsn, ?PACKAGE_SUFFIX]),
  105. FileURI = lists:flatten(lists:join("/", [GitUrl, ReleaseTag, TargetName])),
  106. {ok, {_, _, PluginBin}} = httpc:request(FileURI),
  107. Pkg = filename:join([
  108. WorkDir,
  109. TargetName
  110. ]),
  111. ok = file:write_file(Pkg, PluginBin),
  112. Opts#{package => Pkg};
  113. get_demo_plugin_package(Dir) ->
  114. get_demo_plugin_package(
  115. #{
  116. release_name => ?EMQX_PLUGIN_TEMPLATE_RELEASE_NAME,
  117. git_url => ?EMQX_PLUGIN_TEMPLATE_URL,
  118. vsn => ?EMQX_PLUGIN_TEMPLATE_VSN,
  119. tag => ?EMQX_PLUGIN_TEMPLATE_TAG,
  120. shdir => Dir
  121. }
  122. ).
  123. bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
  124. bin(L) when is_list(L) -> unicode:characters_to_binary(L, utf8);
  125. bin(B) when is_binary(B) -> B.
  126. hookpoints() ->
  127. [
  128. 'client.connect',
  129. 'client.connack',
  130. 'client.connected',
  131. 'client.disconnected',
  132. 'client.authenticate',
  133. 'client.authorize',
  134. 'client.subscribe',
  135. 'client.unsubscribe',
  136. 'session.created',
  137. 'session.subscribed',
  138. 'session.unsubscribed',
  139. 'session.resumed',
  140. 'session.discarded',
  141. 'session.takenover',
  142. 'session.terminated',
  143. 'message.publish',
  144. 'message.puback',
  145. 'message.delivered',
  146. 'message.acked',
  147. 'message.dropped'
  148. ].
  149. get_hook_modules() ->
  150. lists:flatmap(
  151. fun(HookPoint) ->
  152. CBs = emqx_hooks:lookup(HookPoint),
  153. [Mod || {callback, {Mod, _Fn, _Args}, _Filter, _Prio} <- CBs]
  154. end,
  155. hookpoints()
  156. ).
  157. t_demo_install_start_stop_uninstall({init, Config}) ->
  158. Opts = #{package := Package} = get_demo_plugin_package(),
  159. NameVsn = filename:basename(Package, ?PACKAGE_SUFFIX),
  160. [
  161. {name_vsn, NameVsn},
  162. {plugin_opts, Opts}
  163. | Config
  164. ];
  165. t_demo_install_start_stop_uninstall({'end', _Config}) ->
  166. ok;
  167. t_demo_install_start_stop_uninstall(Config) ->
  168. NameVsn = proplists:get_value(name_vsn, Config),
  169. #{
  170. release_name := ReleaseName,
  171. vsn := PluginVsn
  172. } = proplists:get_value(plugin_opts, Config),
  173. ok = emqx_plugins:ensure_installed(NameVsn),
  174. %% idempotent
  175. ok = emqx_plugins:ensure_installed(NameVsn),
  176. {ok, Info} = emqx_plugins:describe(NameVsn),
  177. ?assertEqual([maps:without([readme], Info)], emqx_plugins:list()),
  178. %% start
  179. ok = emqx_plugins:ensure_started(NameVsn),
  180. ok = assert_app_running(?EMQX_PLUGIN_APP_NAME, true),
  181. ok = assert_app_running(map_sets, true),
  182. %% start (idempotent)
  183. ok = emqx_plugins:ensure_started(bin(NameVsn)),
  184. ok = assert_app_running(?EMQX_PLUGIN_APP_NAME, true),
  185. ok = assert_app_running(map_sets, true),
  186. %% running app can not be un-installed
  187. ?assertMatch(
  188. {error, _},
  189. emqx_plugins:ensure_uninstalled(NameVsn)
  190. ),
  191. %% stop
  192. ok = emqx_plugins:ensure_stopped(NameVsn),
  193. ok = assert_app_running(?EMQX_PLUGIN_APP_NAME, false),
  194. ok = assert_app_running(map_sets, false),
  195. %% stop (idempotent)
  196. ok = emqx_plugins:ensure_stopped(bin(NameVsn)),
  197. ok = assert_app_running(?EMQX_PLUGIN_APP_NAME, false),
  198. ok = assert_app_running(map_sets, false),
  199. %% still listed after stopped
  200. ReleaseNameBin = list_to_binary(ReleaseName),
  201. PluginVsnBin = list_to_binary(PluginVsn),
  202. ?assertMatch(
  203. [
  204. #{
  205. <<"name">> := ReleaseNameBin,
  206. <<"rel_vsn">> := PluginVsnBin
  207. }
  208. ],
  209. emqx_plugins:list()
  210. ),
  211. ok = emqx_plugins:ensure_uninstalled(NameVsn),
  212. ?assertEqual([], emqx_plugins:list()),
  213. ok.
  214. %% help function to create a info file.
  215. %% The file is in JSON format when built
  216. %% but since we are using hocon:load to load it
  217. %% ad-hoc test files can be in hocon format
  218. write_info_file(Config, NameVsn, Content) ->
  219. WorkDir = proplists:get_value(install_dir, Config),
  220. InfoFile = filename:join([WorkDir, NameVsn, "release.json"]),
  221. ok = filelib:ensure_dir(InfoFile),
  222. ok = file:write_file(InfoFile, Content).
  223. t_position({init, Config}) ->
  224. #{package := Package} = get_demo_plugin_package(),
  225. NameVsn = filename:basename(Package, ?PACKAGE_SUFFIX),
  226. [{name_vsn, NameVsn} | Config];
  227. t_position({'end', _Config}) ->
  228. ok;
  229. t_position(Config) ->
  230. NameVsn = proplists:get_value(name_vsn, Config),
  231. ok = emqx_plugins:ensure_installed(NameVsn),
  232. ok = emqx_plugins:ensure_enabled(NameVsn),
  233. FakeInfo =
  234. "name=position, rel_vsn=\"2\", rel_apps=[\"position-9\"],"
  235. "description=\"desc fake position app\"",
  236. PosApp2 = <<"position-2">>,
  237. ok = write_info_file(Config, PosApp2, FakeInfo),
  238. %% fake a disabled plugin in config
  239. ok = ensure_state(PosApp2, {before, NameVsn}, false),
  240. ListFun = fun() ->
  241. lists:map(
  242. fun(
  243. #{<<"name">> := Name, <<"rel_vsn">> := Vsn}
  244. ) ->
  245. <<Name/binary, "-", Vsn/binary>>
  246. end,
  247. emqx_plugins:list()
  248. )
  249. end,
  250. ?assertEqual([PosApp2, list_to_binary(NameVsn)], ListFun()),
  251. emqx_plugins:ensure_enabled(PosApp2, {behind, NameVsn}),
  252. ?assertEqual([list_to_binary(NameVsn), PosApp2], ListFun()),
  253. ok = emqx_plugins:ensure_stopped(),
  254. ok = emqx_plugins:ensure_disabled(NameVsn),
  255. ok = emqx_plugins:ensure_disabled(PosApp2),
  256. ok = emqx_plugins:ensure_uninstalled(NameVsn),
  257. ok = emqx_plugins:ensure_uninstalled(PosApp2),
  258. ?assertEqual([], emqx_plugins:list()),
  259. ok.
  260. t_start_restart_and_stop({init, Config}) ->
  261. #{package := Package} = get_demo_plugin_package(),
  262. NameVsn = filename:basename(Package, ?PACKAGE_SUFFIX),
  263. [{name_vsn, NameVsn} | Config];
  264. t_start_restart_and_stop({'end', _Config}) ->
  265. ok;
  266. t_start_restart_and_stop(Config) ->
  267. %% pre-condition
  268. Hooks0 = get_hook_modules(),
  269. ?assertNot(lists:member(?EMQX_PLUGIN_APP_NAME, Hooks0), #{hooks => Hooks0}),
  270. NameVsn = proplists:get_value(name_vsn, Config),
  271. ok = emqx_plugins:ensure_installed(NameVsn),
  272. ok = emqx_plugins:ensure_enabled(NameVsn),
  273. %% Application is not yet started.
  274. Hooks1 = get_hook_modules(),
  275. ?assertNot(lists:member(?EMQX_PLUGIN_APP_NAME, Hooks1), #{hooks => Hooks1}),
  276. FakeInfo =
  277. "name=bar, rel_vsn=\"2\", rel_apps=[\"bar-9\"],"
  278. "description=\"desc bar\"",
  279. Bar2 = <<"bar-2">>,
  280. ok = write_info_file(Config, Bar2, FakeInfo),
  281. %% fake a disabled plugin in config
  282. ok = ensure_state(Bar2, front, false),
  283. assert_app_running(?EMQX_PLUGIN_APP_NAME, false),
  284. ok = emqx_plugins:ensure_started(),
  285. assert_app_running(?EMQX_PLUGIN_APP_NAME, true),
  286. %% Should have called the application start callback, which in turn adds hooks.
  287. Hooks2 = get_hook_modules(),
  288. ?assert(lists:member(?EMQX_PLUGIN_APP_NAME, Hooks2), #{hooks => Hooks2}),
  289. %% fake enable bar-2
  290. ok = ensure_state(Bar2, rear, true),
  291. %% should cause an error
  292. ?check_trace(
  293. emqx_plugins:ensure_started(),
  294. fun(Trace) ->
  295. ?assertMatch(
  296. [#{function := _, errors := [_ | _]}],
  297. ?of_kind(for_plugins_action_error_occurred, Trace)
  298. ),
  299. ok
  300. end
  301. ),
  302. %% but demo plugin should still be running
  303. assert_app_running(?EMQX_PLUGIN_APP_NAME, true),
  304. %% stop all
  305. ok = emqx_plugins:ensure_stopped(),
  306. assert_app_running(?EMQX_PLUGIN_APP_NAME, false),
  307. ok = ensure_state(Bar2, rear, false),
  308. %% Should have called the application stop callback, which removes the hooks.
  309. Hooks3 = get_hook_modules(),
  310. ?assertNot(lists:member(?EMQX_PLUGIN_APP_NAME, Hooks3), #{hooks => Hooks3}),
  311. ok = emqx_plugins:restart(NameVsn),
  312. assert_app_running(?EMQX_PLUGIN_APP_NAME, true),
  313. %% repeat
  314. ok = emqx_plugins:restart(NameVsn),
  315. assert_app_running(?EMQX_PLUGIN_APP_NAME, true),
  316. ok = emqx_plugins:ensure_stopped(),
  317. ok = emqx_plugins:ensure_disabled(NameVsn),
  318. ok = emqx_plugins:ensure_uninstalled(NameVsn),
  319. ok = emqx_plugins:ensure_uninstalled(Bar2),
  320. ?assertEqual([], emqx_plugins:list()),
  321. ok.
  322. t_legacy_plugins({init, Config}) ->
  323. Config;
  324. t_legacy_plugins({'end', _Config}) ->
  325. ok;
  326. t_legacy_plugins(Config) ->
  327. lists:foreach(
  328. fun(LegacyPlugin) ->
  329. test_legacy_plugin(LegacyPlugin, Config)
  330. end,
  331. ?EMQX_PLUGIN_TEMPLATES_LEGACY
  332. ).
  333. test_legacy_plugin(#{app_name := AppName} = LegacyPlugin, _Config) ->
  334. #{package := Package} = get_demo_plugin_package(LegacyPlugin#{
  335. shdir => emqx_plugins:install_dir(), git_url => ?EMQX_PLUGIN_TEMPLATE_URL
  336. }),
  337. NameVsn = filename:basename(Package, ?PACKAGE_SUFFIX),
  338. ok = emqx_plugins:ensure_installed(NameVsn),
  339. %% start
  340. ok = emqx_plugins:ensure_started(NameVsn),
  341. ok = assert_app_running(AppName, true),
  342. ok = assert_app_running(map_sets, true),
  343. %% stop
  344. ok = emqx_plugins:ensure_stopped(NameVsn),
  345. ok = assert_app_running(AppName, false),
  346. ok = assert_app_running(map_sets, false),
  347. ok = emqx_plugins:ensure_uninstalled(NameVsn),
  348. ?assertEqual([], emqx_plugins:list()),
  349. ok.
  350. t_enable_disable({init, Config}) ->
  351. #{package := Package} = get_demo_plugin_package(),
  352. NameVsn = filename:basename(Package, ?PACKAGE_SUFFIX),
  353. [{name_vsn, NameVsn} | Config];
  354. t_enable_disable({'end', Config}) ->
  355. ok = emqx_plugins:ensure_uninstalled(proplists:get_value(name_vsn, Config));
  356. t_enable_disable(Config) ->
  357. NameVsn = proplists:get_value(name_vsn, Config),
  358. ok = emqx_plugins:ensure_installed(NameVsn),
  359. ?assertEqual([#{name_vsn => NameVsn, enable => false}], emqx_plugins:configured()),
  360. ok = emqx_plugins:ensure_enabled(NameVsn),
  361. ?assertEqual([#{name_vsn => NameVsn, enable => true}], emqx_plugins:configured()),
  362. ok = emqx_plugins:ensure_disabled(NameVsn),
  363. ?assertEqual([#{name_vsn => NameVsn, enable => false}], emqx_plugins:configured()),
  364. ok = emqx_plugins:ensure_enabled(bin(NameVsn)),
  365. ?assertEqual([#{name_vsn => NameVsn, enable => true}], emqx_plugins:configured()),
  366. ?assertMatch(
  367. {error, #{
  368. msg := "bad_plugin_config_status",
  369. hint := "disable_the_plugin_first"
  370. }},
  371. emqx_plugins:ensure_uninstalled(NameVsn)
  372. ),
  373. ok = emqx_plugins:ensure_disabled(bin(NameVsn)),
  374. ok = emqx_plugins:ensure_uninstalled(NameVsn),
  375. ?assertMatch({error, _}, emqx_plugins:ensure_enabled(NameVsn)),
  376. ?assertMatch({error, _}, emqx_plugins:ensure_disabled(NameVsn)),
  377. ok.
  378. assert_app_running(Name, true) ->
  379. AllApps = application:which_applications(),
  380. ?assertMatch({Name, _, _}, lists:keyfind(Name, 1, AllApps));
  381. assert_app_running(Name, false) ->
  382. AllApps = application:which_applications(),
  383. ?assertEqual(false, lists:keyfind(Name, 1, AllApps)).
  384. assert_started_and_hooks_loaded() ->
  385. PluginConfig = emqx_plugins:list(),
  386. ct:pal("plugin config:\n ~p", [PluginConfig]),
  387. ?assertMatch([_], PluginConfig),
  388. assert_app_running(?EMQX_PLUGIN_APP_NAME, true),
  389. Hooks = get_hook_modules(),
  390. ?assert(lists:member(?EMQX_PLUGIN_APP_NAME, Hooks), #{hooks => Hooks}),
  391. ok.
  392. t_bad_tar_gz({init, Config}) ->
  393. Config;
  394. t_bad_tar_gz({'end', _Config}) ->
  395. ok;
  396. t_bad_tar_gz(Config) ->
  397. WorkDir = proplists:get_value(install_dir, Config),
  398. FakeTarTz = filename:join([WorkDir, "fake-vsn.tar.gz"]),
  399. ok = file:write_file(FakeTarTz, "a\n"),
  400. ?assertMatch(
  401. {error, #{
  402. msg := "bad_plugin_package",
  403. reason := eof
  404. }},
  405. emqx_plugins:ensure_installed("fake-vsn")
  406. ),
  407. %% the plugin tarball can not be found on any nodes
  408. ?assertMatch(
  409. {error, #{
  410. msg := "no_nodes_to_copy_plugin_from",
  411. reason := plugin_not_found
  412. }},
  413. emqx_plugins:ensure_installed("nonexisting")
  414. ),
  415. ?assertEqual([], emqx_plugins:list()),
  416. ok = emqx_plugins:delete_package("fake-vsn"),
  417. %% idempotent
  418. ok = emqx_plugins:delete_package("fake-vsn").
  419. %% create with incomplete info file
  420. %% failed install attempts should not leave behind extracted dir
  421. t_bad_tar_gz2({init, Config}) ->
  422. WorkDir = proplists:get_value(install_dir, Config),
  423. NameVsn = "foo-0.2",
  424. %% this an invalid info file content (description missing)
  425. BadInfo = "name=foo, rel_vsn=\"0.2\", rel_apps=[foo]",
  426. ok = write_info_file(Config, NameVsn, BadInfo),
  427. TarGz = filename:join([WorkDir, NameVsn ++ ".tar.gz"]),
  428. ok = make_tar(WorkDir, NameVsn),
  429. [{tar_gz, TarGz}, {name_vsn, NameVsn} | Config];
  430. t_bad_tar_gz2({'end', Config}) ->
  431. NameVsn = ?config(name_vsn, Config),
  432. ok = emqx_plugins:delete_package(NameVsn),
  433. ok;
  434. t_bad_tar_gz2(Config) ->
  435. TarGz = ?config(tar_gz, Config),
  436. NameVsn = ?config(name_vsn, Config),
  437. ?assert(filelib:is_regular(TarGz)),
  438. %% failed to install, it also cleans up the bad content of .tar.gz file
  439. ?assertMatch({error, _}, emqx_plugins:ensure_installed(NameVsn)),
  440. ?assertEqual({error, enoent}, file:read_file_info(emqx_plugins:plugin_dir(NameVsn))),
  441. %% but the tar.gz file is still around
  442. ?assert(filelib:is_regular(TarGz)),
  443. ok.
  444. %% test that we even cleanup content that doesn't match the expected name-vsn
  445. %% pattern
  446. t_tar_vsn_content_mismatch({init, Config}) ->
  447. WorkDir = proplists:get_value(install_dir, Config),
  448. NameVsn = "bad_tar-0.2",
  449. %% this an invalid info file content
  450. BadInfo = "name=foo, rel_vsn=\"0.2\", rel_apps=[\"foo-0.2\"], description=\"lorem ipsum\"",
  451. ok = write_info_file(Config, "foo-0.2", BadInfo),
  452. TarGz = filename:join([WorkDir, "bad_tar-0.2.tar.gz"]),
  453. ok = make_tar(WorkDir, "foo-0.2", NameVsn),
  454. file:delete(filename:join([WorkDir, "foo-0.2", "release.json"])),
  455. [{tar_gz, TarGz}, {name_vsn, NameVsn} | Config];
  456. t_tar_vsn_content_mismatch({'end', Config}) ->
  457. NameVsn = ?config(name_vsn, Config),
  458. ok = emqx_plugins:delete_package(NameVsn),
  459. ok;
  460. t_tar_vsn_content_mismatch(Config) ->
  461. TarGz = ?config(tar_gz, Config),
  462. NameVsn = ?config(name_vsn, Config),
  463. ?assert(filelib:is_regular(TarGz)),
  464. %% failed to install, it also cleans up content of the bad .tar.gz file even
  465. %% if in other directory
  466. ?assertMatch({error, _}, emqx_plugins:ensure_installed(NameVsn)),
  467. ?assertEqual({error, enoent}, file:read_file_info(emqx_plugins:plugin_dir(NameVsn))),
  468. ?assertEqual({error, enoent}, file:read_file_info(emqx_plugins:plugin_dir("foo-0.2"))),
  469. %% the tar.gz file is still around
  470. ?assert(filelib:is_regular(TarGz)),
  471. ok.
  472. t_bad_info_json({init, Config}) ->
  473. Config;
  474. t_bad_info_json({'end', _}) ->
  475. ok;
  476. t_bad_info_json(Config) ->
  477. NameVsn = "test-2",
  478. ok = write_info_file(Config, NameVsn, "bad-syntax"),
  479. ?assertMatch(
  480. {error, #{
  481. msg := "bad_info_file",
  482. reason := {parse_error, _}
  483. }},
  484. emqx_plugins:describe(NameVsn)
  485. ),
  486. ok = write_info_file(Config, NameVsn, "{\"bad\": \"obj\"}"),
  487. ?assertMatch(
  488. {error, #{
  489. msg := "bad_info_file_content",
  490. mandatory_fields := _
  491. }},
  492. emqx_plugins:describe(NameVsn)
  493. ),
  494. ?assertEqual([], emqx_plugins:list()),
  495. emqx_plugins:purge(NameVsn),
  496. ok.
  497. t_elixir_plugin({init, Config}) ->
  498. Opts0 =
  499. #{
  500. release_name => ?EMQX_ELIXIR_PLUGIN_TEMPLATE_RELEASE_NAME,
  501. git_url => ?EMQX_ELIXIR_PLUGIN_TEMPLATE_URL,
  502. vsn => ?EMQX_ELIXIR_PLUGIN_TEMPLATE_VSN,
  503. tag => ?EMQX_ELIXIR_PLUGIN_TEMPLATE_TAG,
  504. shdir => emqx_plugins:install_dir()
  505. },
  506. Opts = #{package := Package} = get_demo_plugin_package(Opts0),
  507. NameVsn = filename:basename(Package, ?PACKAGE_SUFFIX),
  508. [
  509. {name_vsn, NameVsn},
  510. {plugin_opts, Opts}
  511. | Config
  512. ];
  513. t_elixir_plugin({'end', _Config}) ->
  514. ok;
  515. t_elixir_plugin(Config) ->
  516. NameVsn = proplists:get_value(name_vsn, Config),
  517. #{
  518. release_name := ReleaseName,
  519. vsn := PluginVsn
  520. } = proplists:get_value(plugin_opts, Config),
  521. ok = emqx_plugins:ensure_installed(NameVsn),
  522. %% idempotent
  523. ok = emqx_plugins:ensure_installed(NameVsn),
  524. {ok, Info} = emqx_plugins:read_plugin_info(NameVsn, #{}),
  525. ?assertEqual([Info], emqx_plugins:list()),
  526. %% start
  527. ok = emqx_plugins:ensure_started(NameVsn),
  528. ok = assert_app_running(elixir_plugin_template, true),
  529. ok = assert_app_running(hallux, true),
  530. %% start (idempotent)
  531. ok = emqx_plugins:ensure_started(bin(NameVsn)),
  532. ok = assert_app_running(elixir_plugin_template, true),
  533. ok = assert_app_running(hallux, true),
  534. %% call an elixir function
  535. 1 = 'Elixir.ElixirPluginTemplate':ping(),
  536. 3 = 'Elixir.Kernel':'+'(1, 2),
  537. %% running app can not be un-installed
  538. ?assertMatch(
  539. {error, _},
  540. emqx_plugins:ensure_uninstalled(NameVsn)
  541. ),
  542. %% stop
  543. ok = emqx_plugins:ensure_stopped(NameVsn),
  544. ok = assert_app_running(elixir_plugin_template, false),
  545. ok = assert_app_running(hallux, false),
  546. %% stop (idempotent)
  547. ok = emqx_plugins:ensure_stopped(bin(NameVsn)),
  548. ok = assert_app_running(elixir_plugin_template, false),
  549. ok = assert_app_running(hallux, false),
  550. %% still listed after stopped
  551. ReleaseNameBin = list_to_binary(ReleaseName),
  552. PluginVsnBin = list_to_binary(PluginVsn),
  553. ?assertMatch(
  554. [
  555. #{
  556. <<"name">> := ReleaseNameBin,
  557. <<"rel_vsn">> := PluginVsnBin
  558. }
  559. ],
  560. emqx_plugins:list()
  561. ),
  562. ok = emqx_plugins:ensure_uninstalled(NameVsn),
  563. ?assertEqual([], emqx_plugins:list()),
  564. ok.
  565. t_load_config_from_cli({init, Config}) ->
  566. #{package := Package} = get_demo_plugin_package(),
  567. NameVsn = filename:basename(Package, ?PACKAGE_SUFFIX),
  568. [{name_vsn, NameVsn} | Config];
  569. t_load_config_from_cli({'end', Config}) ->
  570. NameVsn = ?config(name_vsn, Config),
  571. ok = emqx_plugins:ensure_stopped(NameVsn),
  572. ok = emqx_plugins:ensure_uninstalled(NameVsn),
  573. ok;
  574. t_load_config_from_cli(Config) when is_list(Config) ->
  575. NameVsn = ?config(name_vsn, Config),
  576. ok = emqx_plugins:ensure_installed(NameVsn),
  577. ?assertEqual([#{name_vsn => NameVsn, enable => false}], emqx_plugins:configured()),
  578. ok = emqx_plugins:ensure_enabled(NameVsn),
  579. ok = emqx_plugins:ensure_started(NameVsn),
  580. Params0 = unused,
  581. ?assertMatch(
  582. {200, [#{running_status := [#{status := running}]}]},
  583. emqx_mgmt_api_plugins:list_plugins(get, Params0)
  584. ),
  585. %% Now we disable it via CLI loading
  586. Conf0 = emqx_config:get([plugins]),
  587. ?assertMatch(
  588. #{states := [#{enable := true}]},
  589. Conf0
  590. ),
  591. #{states := [Plugin0]} = Conf0,
  592. Conf1 = Conf0#{states := [Plugin0#{enable := false}]},
  593. Filename = filename:join(["/tmp", [?FUNCTION_NAME, ".hocon"]]),
  594. ok = file:write_file(Filename, hocon_pp:do(#{plugins => Conf1}, #{})),
  595. ok = emqx_conf_cli:conf(["load", Filename]),
  596. Conf2 = emqx_config:get([plugins]),
  597. ?assertMatch(
  598. #{states := [#{enable := false}]},
  599. Conf2
  600. ),
  601. ?assertMatch(
  602. {200, [#{running_status := [#{status := stopped}]}]},
  603. emqx_mgmt_api_plugins:list_plugins(get, Params0)
  604. ),
  605. %% Re-enable it via CLI loading
  606. ok = file:write_file(Filename, hocon_pp:do(#{plugins => Conf0}, #{})),
  607. ok = emqx_conf_cli:conf(["load", Filename]),
  608. Conf3 = emqx_config:get([plugins]),
  609. ?assertMatch(
  610. #{states := [#{enable := true}]},
  611. Conf3
  612. ),
  613. ?assertMatch(
  614. {200, [#{running_status := [#{status := running}]}]},
  615. emqx_mgmt_api_plugins:list_plugins(get, Params0)
  616. ),
  617. ok.
  618. group_t_copy_plugin_to_a_new_node({init, Config}) ->
  619. FromInstallDir = filename:join(emqx_cth_suite:work_dir(?FUNCTION_NAME, Config), from),
  620. ok = filelib:ensure_path(FromInstallDir),
  621. ToInstallDir = filename:join(emqx_cth_suite:work_dir(?FUNCTION_NAME, Config), to),
  622. ok = filelib:ensure_path(ToInstallDir),
  623. #{package := Package, release_name := PluginName} = get_demo_plugin_package(FromInstallDir),
  624. Apps = [
  625. emqx,
  626. emqx_conf,
  627. emqx_ctl,
  628. emqx_plugins
  629. ],
  630. [SpecCopyFrom, SpecCopyTo] =
  631. emqx_cth_cluster:mk_nodespecs(
  632. [
  633. {plugins_copy_from, #{role => core, apps => Apps}},
  634. {plugins_copy_to, #{role => core, apps => Apps}}
  635. ],
  636. #{
  637. work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)
  638. }
  639. ),
  640. [CopyFromNode] = emqx_cth_cluster:start([SpecCopyFrom#{join_to => undefined}]),
  641. ok = rpc:call(CopyFromNode, emqx_plugins, put_config_internal, [install_dir, FromInstallDir]),
  642. [CopyToNode] = emqx_cth_cluster:start([SpecCopyTo#{join_to => undefined}]),
  643. ok = rpc:call(CopyToNode, emqx_plugins, put_config_internal, [install_dir, ToInstallDir]),
  644. NameVsn = filename:basename(Package, ?PACKAGE_SUFFIX),
  645. ok = rpc:call(CopyFromNode, emqx_plugins, ensure_installed, [NameVsn]),
  646. ok = rpc:call(CopyFromNode, emqx_plugins, ensure_started, [NameVsn]),
  647. ok = rpc:call(CopyFromNode, emqx_plugins, ensure_enabled, [NameVsn]),
  648. case proplists:get_bool(remove_tar, Config) of
  649. true ->
  650. %% Test the case when a plugin is installed, but its original tar file is removed
  651. %% and must be re-created
  652. ok = file:delete(filename:join(FromInstallDir, NameVsn ++ ?PACKAGE_SUFFIX));
  653. false ->
  654. ok
  655. end,
  656. [
  657. {from_install_dir, FromInstallDir},
  658. {to_install_dir, ToInstallDir},
  659. {copy_from_node, CopyFromNode},
  660. {copy_to_node, CopyToNode},
  661. {name_vsn, NameVsn},
  662. {plugin_name, PluginName}
  663. | Config
  664. ];
  665. group_t_copy_plugin_to_a_new_node({'end', Config}) ->
  666. CopyFromNode = ?config(copy_from_node, Config),
  667. CopyToNode = ?config(copy_to_node, Config),
  668. ok = emqx_cth_cluster:stop([CopyFromNode, CopyToNode]);
  669. group_t_copy_plugin_to_a_new_node(Config) ->
  670. CopyFromNode = proplists:get_value(copy_from_node, Config),
  671. CopyToNode = proplists:get_value(copy_to_node, Config),
  672. CopyToDir = proplists:get_value(to_install_dir, Config),
  673. CopyFromPluginsState = rpc:call(CopyFromNode, emqx_plugins, get_config_interal, [[states], []]),
  674. NameVsn = proplists:get_value(name_vsn, Config),
  675. PluginName = proplists:get_value(plugin_name, Config),
  676. PluginApp = list_to_atom(PluginName),
  677. ?assertMatch([#{enable := true, name_vsn := NameVsn}], CopyFromPluginsState),
  678. ?assert(
  679. proplists:is_defined(
  680. PluginApp,
  681. rpc:call(CopyFromNode, application, which_applications, [])
  682. )
  683. ),
  684. ?assertEqual([], filelib:wildcard(filename:join(CopyToDir, "**"))),
  685. %% Check that a new node doesn't have this plugin before it joins the cluster
  686. ?assertEqual([], rpc:call(CopyToNode, emqx_conf, get, [[plugins, states], []])),
  687. ?assertMatch({error, _}, rpc:call(CopyToNode, emqx_plugins, describe, [NameVsn])),
  688. ?assertNot(
  689. proplists:is_defined(
  690. PluginApp,
  691. rpc:call(CopyToNode, application, which_applications, [])
  692. )
  693. ),
  694. ok = rpc:call(CopyToNode, ekka, join, [CopyFromNode]),
  695. %% Mimic cluster-override conf copying
  696. ok = rpc:call(CopyToNode, emqx_plugins, put_config_internal, [[states], CopyFromPluginsState]),
  697. %% Plugin copying is triggered upon app restart on a new node.
  698. %% This is similar to emqx_conf, which copies cluster-override conf upon start,
  699. %% see: emqx_conf_app:init_conf/0
  700. ok = rpc:call(CopyToNode, application, stop, [emqx_plugins]),
  701. {ok, _} = rpc:call(CopyToNode, application, ensure_all_started, [emqx_plugins]),
  702. %% Plugin config should be synced from `CopyFromNode`
  703. %% by application `emqx` and `emqx_conf`
  704. %% FIXME: in test case, we manually do it here
  705. ok = rpc:call(CopyToNode, emqx_plugins, put_config_internal, [[states], CopyFromPluginsState]),
  706. ok = rpc:call(CopyToNode, emqx_plugins, ensure_installed, []),
  707. ok = rpc:call(CopyToNode, emqx_plugins, ensure_started, []),
  708. ?assertMatch(
  709. {ok, #{running_status := running, config_status := enabled}},
  710. rpc:call(CopyToNode, emqx_plugins, describe, [NameVsn])
  711. ).
  712. %% checks that we can start a cluster with a lone node.
  713. group_t_copy_plugin_to_a_new_node_single_node({init, Config}) ->
  714. ToInstallDir = emqx_cth_suite:work_dir(?FUNCTION_NAME, Config),
  715. file:del_dir_r(ToInstallDir),
  716. ok = filelib:ensure_path(ToInstallDir),
  717. #{package := Package, release_name := PluginName} = get_demo_plugin_package(ToInstallDir),
  718. NameVsn = filename:basename(Package, ?PACKAGE_SUFFIX),
  719. Apps = [
  720. emqx,
  721. emqx_conf,
  722. emqx_ctl,
  723. {emqx_plugins, #{
  724. config => #{
  725. plugins => #{
  726. install_dir => ToInstallDir,
  727. states => [#{name_vsn => NameVsn, enable => true}]
  728. }
  729. }
  730. }}
  731. ],
  732. [CopyToNode] = emqx_cth_cluster:start(
  733. [{plugins_copy_to, #{role => core, apps => Apps}}],
  734. #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)}
  735. ),
  736. [
  737. {to_install_dir, ToInstallDir},
  738. {copy_to_node, CopyToNode},
  739. {name_vsn, NameVsn},
  740. {plugin_name, PluginName}
  741. | Config
  742. ];
  743. group_t_copy_plugin_to_a_new_node_single_node({'end', Config}) ->
  744. CopyToNode = proplists:get_value(copy_to_node, Config),
  745. ok = emqx_cth_cluster:stop([CopyToNode]);
  746. group_t_copy_plugin_to_a_new_node_single_node(Config) ->
  747. CopyToNode = ?config(copy_to_node, Config),
  748. ToInstallDir = ?config(to_install_dir, Config),
  749. NameVsn = proplists:get_value(name_vsn, Config),
  750. %% Start the node for the first time. The plugin should start
  751. %% successfully even if it's not extracted yet. Simply starting
  752. %% the node would crash if not working properly.
  753. ct:pal("~p config:\n ~p", [
  754. CopyToNode, erpc:call(CopyToNode, emqx_plugins, get_config_interal, [[], #{}])
  755. ]),
  756. ct:pal("~p install_dir:\n ~p", [
  757. CopyToNode, erpc:call(CopyToNode, file, list_dir, [ToInstallDir])
  758. ]),
  759. %% Plugin config should be synced from `CopyFromNode`
  760. %% by application `emqx` and `emqx_conf`
  761. %% FIXME: in test case, we manually do it here
  762. ok = rpc:call(CopyToNode, emqx_plugins, put_config_internal, [
  763. [states], [#{enable => true, name_vsn => NameVsn}]
  764. ]),
  765. ok = rpc:call(CopyToNode, emqx_plugins, ensure_installed, []),
  766. ok = rpc:call(CopyToNode, emqx_plugins, ensure_started, []),
  767. ?assertMatch(
  768. {ok, #{running_status := running, config_status := enabled}},
  769. rpc:call(CopyToNode, emqx_plugins, describe, [NameVsn])
  770. ),
  771. ok.
  772. group_t_cluster_leave({init, Config}) ->
  773. Specs = emqx_cth_cluster:mk_nodespecs(
  774. [
  775. {group_t_cluster_leave1, #{role => core, apps => [emqx, emqx_conf, emqx_ctl]}},
  776. {group_t_cluster_leave2, #{role => core, apps => [emqx, emqx_conf, emqx_ctl]}}
  777. ],
  778. #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)}
  779. ),
  780. Nodes = emqx_cth_cluster:start(Specs),
  781. InstallRelDir = "plugins_copy_to",
  782. InstallDirs = [filename:join(WD, InstallRelDir) || #{work_dir := WD} <- Specs],
  783. ok = lists:foreach(fun filelib:ensure_path/1, InstallDirs),
  784. #{package := Package, release_name := PluginName} = get_demo_plugin_package(hd(InstallDirs)),
  785. NameVsn = filename:basename(Package, ?PACKAGE_SUFFIX),
  786. [{ok, _}, {ok, _}] = erpc:multicall(Nodes, emqx_cth_suite, start_app, [
  787. emqx_plugins,
  788. #{
  789. config => #{
  790. plugins => #{
  791. install_dir => InstallRelDir,
  792. states => [#{name_vsn => NameVsn, enable => true}]
  793. }
  794. }
  795. }
  796. ]),
  797. [
  798. {nodes, Nodes},
  799. {name_vsn, NameVsn},
  800. {plugin_name, PluginName}
  801. | Config
  802. ];
  803. group_t_cluster_leave({'end', Config}) ->
  804. Nodes = ?config(nodes, Config),
  805. ok = emqx_cth_cluster:stop(Nodes);
  806. group_t_cluster_leave(Config) ->
  807. [N1, N2] = ?config(nodes, Config),
  808. NameVsn = proplists:get_value(name_vsn, Config),
  809. ok = erpc:call(N1, emqx_plugins, ensure_installed, [NameVsn]),
  810. ok = erpc:call(N1, emqx_plugins, ensure_started, [NameVsn]),
  811. ok = erpc:call(N1, emqx_plugins, ensure_enabled, [NameVsn]),
  812. ok = erpc:call(N2, emqx_plugins, ensure_installed, [NameVsn]),
  813. ok = erpc:call(N2, emqx_plugins, ensure_started, [NameVsn]),
  814. ok = erpc:call(N2, emqx_plugins, ensure_enabled, [NameVsn]),
  815. Params = unused,
  816. %% 2 nodes running
  817. ?assertMatch(
  818. {200, [#{running_status := [#{status := running}, #{status := running}]}]},
  819. erpc:call(N1, emqx_mgmt_api_plugins, list_plugins, [get, Params])
  820. ),
  821. ?assertMatch(
  822. {200, [#{running_status := [#{status := running}, #{status := running}]}]},
  823. erpc:call(N2, emqx_mgmt_api_plugins, list_plugins, [get, Params])
  824. ),
  825. %% Now, one node leaves the cluster.
  826. ok = erpc:call(N2, ekka, leave, []),
  827. %% Each node will no longer ask the plugin status to the other.
  828. ?assertMatch(
  829. {200, [#{running_status := [#{node := N1, status := running}]}]},
  830. erpc:call(N1, emqx_mgmt_api_plugins, list_plugins, [get, Params])
  831. ),
  832. ?assertMatch(
  833. {200, [#{running_status := [#{node := N2, status := running}]}]},
  834. erpc:call(N2, emqx_mgmt_api_plugins, list_plugins, [get, Params])
  835. ),
  836. ok.
  837. %% Checks that starting a node with a plugin enabled starts it correctly, and that the
  838. %% hooks added by the plugin's `application:start/2' callback are indeed in place.
  839. %% See also: https://github.com/emqx/emqx/issues/13378
  840. t_start_node_with_plugin_enabled({init, Config}) ->
  841. #{package := Package} = get_demo_plugin_package(),
  842. Basename = filename:basename(Package),
  843. NameVsn = filename:basename(Package, ?PACKAGE_SUFFIX),
  844. AppSpecs = [
  845. emqx,
  846. emqx_conf,
  847. emqx_ctl,
  848. {emqx_plugins, #{
  849. config =>
  850. #{
  851. plugins =>
  852. #{
  853. install_dir => <<"plugins">>,
  854. states =>
  855. [
  856. #{
  857. enable => true,
  858. name_vsn => NameVsn
  859. }
  860. ]
  861. }
  862. }
  863. }}
  864. ],
  865. Name1 = t_cluster_start_enabled1,
  866. Name2 = t_cluster_start_enabled2,
  867. Specs = emqx_cth_cluster:mk_nodespecs(
  868. [
  869. {Name1, #{role => core, apps => AppSpecs, join_to => undefined}},
  870. {Name2, #{role => core, apps => AppSpecs, join_to => undefined}}
  871. ],
  872. #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)}
  873. ),
  874. lists:foreach(
  875. fun(#{work_dir := WorkDir}) ->
  876. Destination = filename:join([WorkDir, "plugins", Basename]),
  877. ok = filelib:ensure_dir(Destination),
  878. {ok, _} = file:copy(Package, Destination)
  879. end,
  880. Specs
  881. ),
  882. Names = [Name1, Name2],
  883. Nodes = [emqx_cth_cluster:node_name(N) || N <- Names],
  884. [
  885. {node_specs, Specs},
  886. {nodes, Nodes},
  887. {name_vsn, NameVsn}
  888. | Config
  889. ];
  890. t_start_node_with_plugin_enabled({'end', Config}) ->
  891. Nodes = ?config(nodes, Config),
  892. ok = emqx_cth_cluster:stop(Nodes),
  893. ok;
  894. t_start_node_with_plugin_enabled(Config) when is_list(Config) ->
  895. NodeSpecs = ?config(node_specs, Config),
  896. ?check_trace(
  897. #{timetrap => 10_000},
  898. begin
  899. %% Hack: we use `restart' here to disable the clean slate verification, as we
  900. %% just created and populated the `plugins' directory...
  901. [N1, N2 | _] = lists:flatmap(fun emqx_cth_cluster:restart/1, NodeSpecs),
  902. ?ON(N1, assert_started_and_hooks_loaded()),
  903. ?ON(N2, assert_started_and_hooks_loaded()),
  904. %% Now make them join.
  905. %% N.B.: We need to start autocluster so that applications are restarted in
  906. %% order, and also we need to override the config loader to emulate what
  907. %% `emqx_cth_cluster' does and avoid the node crashing due to lack of config
  908. %% keys.
  909. ok = ?ON(N2, emqx_machine_boot:start_autocluster()),
  910. ?ON(N2, begin
  911. StartCallback0 =
  912. case ekka:env({callback, start}) of
  913. {ok, SC0} -> SC0;
  914. _ -> fun() -> ok end
  915. end,
  916. StartCallback = fun() ->
  917. ok = emqx_app:set_config_loader(emqx_cth_suite),
  918. StartCallback0()
  919. end,
  920. ekka:callback(start, StartCallback)
  921. end),
  922. {ok, {ok, _}} =
  923. ?wait_async_action(
  924. ?ON(N2, ekka:join(N1)),
  925. #{?snk_kind := "emqx_plugins_app_started"}
  926. ),
  927. ct:pal("checking N1 state"),
  928. ?ON(N1, assert_started_and_hooks_loaded()),
  929. ct:pal("checking N2 state"),
  930. ?ON(N2, assert_started_and_hooks_loaded()),
  931. ok
  932. end,
  933. []
  934. ),
  935. ok.
  936. make_tar(Cwd, NameWithVsn) ->
  937. make_tar(Cwd, NameWithVsn, NameWithVsn).
  938. make_tar(Cwd, NameWithVsn, TarfileVsn) ->
  939. {ok, OriginalCwd} = file:get_cwd(),
  940. ok = file:set_cwd(Cwd),
  941. try
  942. Files = filelib:wildcard(NameWithVsn ++ "/**"),
  943. TarFile = TarfileVsn ++ ".tar.gz",
  944. ok = erl_tar:create(TarFile, Files, [compressed])
  945. after
  946. file:set_cwd(OriginalCwd)
  947. end.
  948. ensure_state(NameVsn, Position, Enabled) ->
  949. %% NOTE: this is an internal function that is (legacy) exported in test builds only...
  950. emqx_plugins:ensure_state(NameVsn, Position, Enabled, _ConfLocation = local).