emqx_plugins.erl 26 KB


  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2017-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_plugins).
  17. -include_lib("emqx/include/logger.hrl").
  18. -include_lib("emqx/include/logger.hrl").
  19. -include("emqx_plugins.hrl").
  20. -ifdef(TEST).
  21. -include_lib("eunit/include/eunit.hrl").
  22. -endif.
  23. -export([
  24. ensure_installed/1,
  25. ensure_uninstalled/1,
  26. ensure_enabled/1,
  27. ensure_enabled/2,
  28. ensure_disabled/1,
  29. purge/1,
  30. delete_package/1
  31. ]).
  32. -export([
  33. ensure_started/0,
  34. ensure_started/1,
  35. ensure_stopped/0,
  36. ensure_stopped/1,
  37. restart/1,
  38. list/0,
  39. describe/1,
  40. parse_name_vsn/1
  41. ]).
  42. -export([
  43. get_config/2,
  44. put_config/2,
  45. get_tar/1
  46. ]).
  47. %% internal
  48. -export([do_ensure_started/1]).
  49. -export([
  50. install_dir/0
  51. ]).
  52. -ifdef(TEST).
  53. -compile(export_all).
  54. -compile(nowarn_export_all).
  55. -endif.
  56. %% "my_plugin-0.1.0"
  57. -type name_vsn() :: binary() | string().
  58. %% the parse result of the JSON info file
  59. -type plugin() :: map().
  60. -type position() :: no_move | front | rear | {before, name_vsn()} | {behind, name_vsn()}.
  61. %%--------------------------------------------------------------------
  62. %% APIs
  63. %%--------------------------------------------------------------------
  64. %% @doc Describe a plugin.
  65. -spec describe(name_vsn()) -> {ok, plugin()} | {error, any()}.
  66. describe(NameVsn) -> read_plugin(NameVsn, #{fill_readme => true}).
  67. %% @doc Install a .tar.gz package placed in install_dir.
  68. -spec ensure_installed(name_vsn()) -> ok | {error, any()}.
  69. ensure_installed(NameVsn) ->
  70. case read_plugin(NameVsn, #{}) of
  71. {ok, _} ->
  72. ok;
  73. {error, _} ->
  74. ok = purge(NameVsn),
  75. do_ensure_installed(NameVsn)
  76. end.
  77. do_ensure_installed(NameVsn) ->
  78. TarGz = pkg_file(NameVsn),
  79. case erl_tar:extract(TarGz, [compressed, memory]) of
  80. {ok, TarContent} ->
  81. ok = write_tar_file_content(install_dir(), TarContent),
  82. case read_plugin(NameVsn, #{}) of
  83. {ok, _} ->
  84. ok;
  85. {error, Reason} ->
  86. ?SLOG(warning, Reason#{msg => "failed_to_read_after_install"}),
  87. ok = delete_tar_file_content(install_dir(), TarContent),
  88. {error, Reason}
  89. end;
  90. {error, {_, enoent}} ->
  91. {error, #{
  92. reason => "failed_to_extract_plugin_package",
  93. path => TarGz,
  94. return => not_found
  95. }};
  96. {error, Reason} ->
  97. {error, #{
  98. reason => "bad_plugin_package",
  99. path => TarGz,
  100. return => Reason
  101. }}
  102. end.
  103. -spec get_tar(name_vsn()) -> {ok, binary()} | {error, any}.
  104. get_tar(NameVsn) ->
  105. TarGz = pkg_file(NameVsn),
  106. case file:read_file(TarGz) of
  107. {ok, Content} ->
  108. {ok, Content};
  109. {error, _} ->
  110. case maybe_create_tar(NameVsn, TarGz, install_dir()) of
  111. ok ->
  112. file:read_file(TarGz);
  113. Err ->
  114. Err
  115. end
  116. end.
  117. maybe_create_tar(NameVsn, TarGzName, InstallDir) when is_binary(InstallDir) ->
  118. maybe_create_tar(NameVsn, TarGzName, binary_to_list(InstallDir));
  119. maybe_create_tar(NameVsn, TarGzName, InstallDir) ->
  120. case filelib:wildcard(filename:join(dir(NameVsn), "**")) of
  121. [_ | _] = PluginFiles ->
  122. InstallDir1 = string:trim(InstallDir, trailing, "/") ++ "/",
  123. PluginFiles1 = [{string:prefix(F, InstallDir1), F} || F <- PluginFiles],
  124. erl_tar:create(TarGzName, PluginFiles1, [compressed]);
  125. _ ->
  126. {error, plugin_not_found}
  127. end.
  128. write_tar_file_content(BaseDir, TarContent) ->
  129. lists:foreach(
  130. fun({Name, Bin}) ->
  131. Filename = filename:join(BaseDir, Name),
  132. ok = filelib:ensure_dir(Filename),
  133. ok = file:write_file(Filename, Bin)
  134. end,
  135. TarContent
  136. ).
  137. delete_tar_file_content(BaseDir, TarContent) ->
  138. lists:foreach(
  139. fun({Name, _}) ->
  140. Filename = filename:join(BaseDir, Name),
  141. case filelib:is_file(Filename) of
  142. true ->
  143. TopDirOrFile = top_dir(BaseDir, Filename),
  144. ok = file:del_dir_r(TopDirOrFile);
  145. false ->
  146. %% probably already deleted
  147. ok
  148. end
  149. end,
  150. TarContent
  151. ).
  152. top_dir(BaseDir0, DirOrFile) ->
  153. BaseDir = normalize_dir(BaseDir0),
  154. case filename:dirname(DirOrFile) of
  155. RockBottom when RockBottom =:= "/" orelse RockBottom =:= "." ->
  156. throw({out_of_bounds, DirOrFile});
  157. BaseDir ->
  158. DirOrFile;
  159. Parent ->
  160. top_dir(BaseDir, Parent)
  161. end.
  162. normalize_dir(Dir) ->
  163. %% Get rid of possible trailing slash
  164. filename:join([Dir, ""]).
  165. -ifdef(TEST).
  166. normalize_dir_test_() ->
  167. [
  168. ?_assertEqual("foo", normalize_dir("foo")),
  169. ?_assertEqual("foo", normalize_dir("foo/")),
  170. ?_assertEqual("/foo", normalize_dir("/foo")),
  171. ?_assertEqual("/foo", normalize_dir("/foo/"))
  172. ].
  173. top_dir_test_() ->
  174. [
  175. ?_assertEqual("base/foo", top_dir("base", filename:join(["base", "foo", "bar"]))),
  176. ?_assertEqual("/base/foo", top_dir("/base", filename:join(["/", "base", "foo", "bar"]))),
  177. ?_assertEqual("/base/foo", top_dir("/base/", filename:join(["/", "base", "foo", "bar"]))),
  178. ?_assertThrow({out_of_bounds, _}, top_dir("/base", filename:join(["/", "base"]))),
  179. ?_assertThrow({out_of_bounds, _}, top_dir("/base", filename:join(["/", "foo", "bar"])))
  180. ].
  181. -endif.
  182. %% @doc Ensure files and directories for the given plugin are being deleted.
  183. %% If a plugin is running, or enabled, an error is returned.
  184. -spec ensure_uninstalled(name_vsn()) -> ok | {error, any()}.
  185. ensure_uninstalled(NameVsn) ->
  186. case read_plugin(NameVsn, #{}) of
  187. {ok, #{running_status := RunningSt}} when RunningSt =/= stopped ->
  188. {error, #{
  189. reason => "bad_plugin_running_status",
  190. hint => "stop_the_plugin_first"
  191. }};
  192. {ok, #{config_status := enabled}} ->
  193. {error, #{
  194. reason => "bad_plugin_config_status",
  195. hint => "disable_the_plugin_first"
  196. }};
  197. _ ->
  198. purge(NameVsn),
  199. ensure_delete(NameVsn)
  200. end.
  201. ensure_delete(NameVsn0) ->
  202. NameVsn = bin(NameVsn0),
  203. List = configured(),
  204. put_configured(lists:filter(fun(#{name_vsn := N1}) -> bin(N1) =/= NameVsn end, List)),
  205. ok.
  206. %% @doc Ensure a plugin is enabled to the end of the plugins list.
  207. -spec ensure_enabled(name_vsn()) -> ok | {error, any()}.
  208. ensure_enabled(NameVsn) ->
  209. ensure_enabled(NameVsn, no_move).
  210. %% @doc Ensure a plugin is enabled at the given position of the plugin list.
  211. -spec ensure_enabled(name_vsn(), position()) -> ok | {error, any()}.
  212. ensure_enabled(NameVsn, Position) ->
  213. ensure_state(NameVsn, Position, true).
  214. %% @doc Ensure a plugin is disabled.
  215. -spec ensure_disabled(name_vsn()) -> ok | {error, any()}.
  216. ensure_disabled(NameVsn) ->
  217. ensure_state(NameVsn, no_move, false).
  218. ensure_state(NameVsn, Position, State) when is_binary(NameVsn) ->
  219. ensure_state(binary_to_list(NameVsn), Position, State);
  220. ensure_state(NameVsn, Position, State) ->
  221. case read_plugin(NameVsn, #{}) of
  222. {ok, _} ->
  223. Item = #{
  224. name_vsn => NameVsn,
  225. enable => State
  226. },
  227. tryit("ensure_state", fun() -> ensure_configured(Item, Position) end);
  228. {error, Reason} ->
  229. {error, Reason}
  230. end.
  231. ensure_configured(#{name_vsn := NameVsn} = Item, Position) ->
  232. Configured = configured(),
  233. SplitFun = fun(#{name_vsn := Nv}) -> bin(Nv) =/= bin(NameVsn) end,
  234. {Front, Rear} = lists:splitwith(SplitFun, Configured),
  235. NewConfigured =
  236. case Rear of
  237. [_ | More] when Position =:= no_move ->
  238. Front ++ [Item | More];
  239. [_ | More] ->
  240. add_new_configured(Front ++ More, Position, Item);
  241. [] ->
  242. add_new_configured(Configured, Position, Item)
  243. end,
  244. ok = put_configured(NewConfigured).
  245. add_new_configured(Configured, no_move, Item) ->
  246. %% default to rear
  247. add_new_configured(Configured, rear, Item);
  248. add_new_configured(Configured, front, Item) ->
  249. [Item | Configured];
  250. add_new_configured(Configured, rear, Item) ->
  251. Configured ++ [Item];
  252. add_new_configured(Configured, {Action, NameVsn}, Item) ->
  253. SplitFun = fun(#{name_vsn := Nv}) -> bin(Nv) =/= bin(NameVsn) end,
  254. {Front, Rear} = lists:splitwith(SplitFun, Configured),
  255. Rear =:= [] andalso
  256. throw(#{
  257. error => "position_anchor_plugin_not_configured",
  258. hint => "maybe_install_and_configure",
  259. name_vsn => NameVsn
  260. }),
  261. case Action of
  262. before ->
  263. Front ++ [Item | Rear];
  264. behind ->
  265. [Anchor | Rear0] = Rear,
  266. Front ++ [Anchor, Item | Rear0]
  267. end.
  268. %% @doc Delete the package file.
  269. -spec delete_package(name_vsn()) -> ok.
  270. delete_package(NameVsn) ->
  271. File = pkg_file(NameVsn),
  272. case file:delete(File) of
  273. ok ->
  274. ?SLOG(info, #{msg => "purged_plugin_dir", path => File}),
  275. ok;
  276. {error, enoent} ->
  277. ok;
  278. {error, Reason} ->
  279. ?SLOG(error, #{
  280. msg => "failed_to_delete_package_file",
  281. path => File,
  282. reason => Reason
  283. }),
  284. {error, Reason}
  285. end.
  286. %% @doc Delete extracted dir
  287. %% In case one lib is shared by multiple plugins.
  288. %% it might be the case that purging one plugin's install dir
  289. %% will cause deletion of loaded beams.
  290. %% It should not be a problem, because shared lib should
  291. %% reside in all the plugin install dirs.
  292. -spec purge(name_vsn()) -> ok.
  293. purge(NameVsn) ->
  294. Dir = dir(NameVsn),
  295. case file:del_dir_r(Dir) of
  296. ok ->
  297. ?SLOG(info, #{msg => "purged_plugin_dir", dir => Dir});
  298. {error, enoent} ->
  299. ok;
  300. {error, Reason} ->
  301. ?SLOG(error, #{
  302. msg => "failed_to_purge_plugin_dir",
  303. dir => Dir,
  304. reason => Reason
  305. }),
  306. {error, Reason}
  307. end.
  308. %% @doc Start all configured plugins are started.
  309. -spec ensure_started() -> ok.
  310. ensure_started() ->
  311. ok = for_plugins(fun ?MODULE:do_ensure_started/1).
  312. %% @doc Start a plugin from Management API or CLI.
  313. %% the input is a <name>-<vsn> string.
  314. -spec ensure_started(name_vsn()) -> ok | {error, term()}.
  315. ensure_started(NameVsn) ->
  316. case do_ensure_started(NameVsn) of
  317. ok ->
  318. ok;
  319. {error, Reason} ->
  320. ?SLOG(alert, #{
  321. msg => "failed_to_start_plugin",
  322. reason => Reason
  323. }),
  324. {error, Reason}
  325. end.
  326. %% @doc Stop all plugins before broker stops.
  327. -spec ensure_stopped() -> ok.
  328. ensure_stopped() ->
  329. for_plugins(fun ?MODULE:ensure_stopped/1).
  330. %% @doc Stop a plugin from Management API or CLI.
  331. -spec ensure_stopped(name_vsn()) -> ok | {error, term()}.
  332. ensure_stopped(NameVsn) ->
  333. tryit(
  334. "stop_plugin",
  335. fun() ->
  336. Plugin = do_read_plugin(NameVsn),
  337. ensure_apps_stopped(Plugin)
  338. end
  339. ).
  340. %% @doc Stop and then start the plugin.
  341. restart(NameVsn) ->
  342. case ensure_stopped(NameVsn) of
  343. ok -> ensure_started(NameVsn);
  344. {error, Reason} -> {error, Reason}
  345. end.
  346. %% @doc List all installed plugins.
  347. %% Including the ones that are installed, but not enabled in config.
  348. -spec list() -> [plugin()].
  349. list() ->
  350. Pattern = filename:join([install_dir(), "*", "release.json"]),
  351. All = lists:filtermap(
  352. fun(JsonFile) ->
  353. case read_plugin({file, JsonFile}, #{}) of
  354. {ok, Info} ->
  355. {true, Info};
  356. {error, Reason} ->
  357. ?SLOG(warning, Reason),
  358. false
  359. end
  360. end,
  361. filelib:wildcard(Pattern)
  362. ),
  363. list(configured(), All).
  364. %% Make sure configured ones are ordered in front.
  365. list([], All) ->
  366. All;
  367. list([#{name_vsn := NameVsn} | Rest], All) ->
  368. SplitF = fun(#{<<"name">> := Name, <<"rel_vsn">> := Vsn}) ->
  369. bin([Name, "-", Vsn]) =/= bin(NameVsn)
  370. end,
  371. case lists:splitwith(SplitF, All) of
  372. {_, []} ->
  373. ?SLOG(warning, #{
  374. msg => "configured_plugin_not_installed",
  375. name_vsn => NameVsn
  376. }),
  377. list(Rest, All);
  378. {Front, [I | Rear]} ->
  379. [I | list(Rest, Front ++ Rear)]
  380. end.
  381. do_ensure_started(NameVsn) ->
  382. tryit(
  383. "start_plugins",
  384. fun() ->
  385. ok = ensure_exists_and_installed(NameVsn),
  386. Plugin = do_read_plugin(NameVsn),
  387. ok = load_code_start_apps(NameVsn, Plugin)
  388. end
  389. ).
  390. %% try the function, catch 'throw' exceptions as normal 'error' return
  391. %% other exceptions with stacktrace logged.
  392. tryit(WhichOp, F) ->
  393. try
  394. F()
  395. catch
  396. throw:Reason ->
  397. %% thrown exceptions are known errors
  398. %% translate to a return value without stacktrace
  399. {error, Reason};
  400. error:Reason:Stacktrace ->
  401. %% unexpected errors, log stacktrace
  402. ?SLOG(warning, #{
  403. msg => "plugin_op_failed",
  404. which_op => WhichOp,
  405. exception => Reason,
  406. stacktrace => Stacktrace
  407. }),
  408. {error, {failed, WhichOp}}
  409. end.
  410. %% read plugin info from the JSON file
  411. %% returns {ok, Info} or {error, Reason}
  412. read_plugin(NameVsn, Options) ->
  413. tryit(
  414. "read_plugin_info",
  415. fun() -> {ok, do_read_plugin(NameVsn, Options)} end
  416. ).
  417. do_read_plugin(Plugin) -> do_read_plugin(Plugin, #{}).
  418. do_read_plugin({file, InfoFile}, Options) ->
  419. [_, NameVsn | _] = lists:reverse(filename:split(InfoFile)),
  420. case hocon:load(InfoFile, #{format => richmap}) of
  421. {ok, RichMap} ->
  422. Info0 = check_plugin(hocon_maps:ensure_plain(RichMap), NameVsn, InfoFile),
  423. Info1 = plugins_readme(NameVsn, Options, Info0),
  424. plugin_status(NameVsn, Info1);
  425. {error, Reason} ->
  426. throw(#{
  427. error => "bad_info_file",
  428. path => InfoFile,
  429. return => Reason
  430. })
  431. end;
  432. do_read_plugin(NameVsn, Options) ->
  433. do_read_plugin({file, info_file(NameVsn)}, Options).
  434. ensure_exists_and_installed(NameVsn) ->
  435. case filelib:is_dir(dir(NameVsn)) of
  436. true ->
  437. ok;
  438. _ ->
  439. Nodes = [N || N <- mria:running_nodes(), N /= node()],
  440. case get_from_any_node(Nodes, NameVsn, []) of
  441. {ok, TarContent} ->
  442. ok = file:write_file(pkg_file(NameVsn), TarContent),
  443. ok = do_ensure_installed(NameVsn);
  444. {error, NodeErrors} ->
  445. ?SLOG(error, #{
  446. msg => "failed_to_copy_plugin_from_other_nodes",
  447. name_vsn => NameVsn,
  448. node_errors => NodeErrors
  449. }),
  450. {error, plugin_not_found}
  451. end
  452. end.
  453. get_from_any_node([], _NameVsn, Errors) ->
  454. {error, Errors};
  455. get_from_any_node([Node | T], NameVsn, Errors) ->
  456. case emqx_plugins_proto_v1:get_tar(Node, NameVsn, infinity) of
  457. {ok, _} = Res ->
  458. Res;
  459. Err ->
  460. get_from_any_node(T, NameVsn, [{Node, Err} | Errors])
  461. end.
  462. plugins_readme(NameVsn, #{fill_readme := true}, Info) ->
  463. case file:read_file(readme_file(NameVsn)) of
  464. {ok, Bin} -> Info#{readme => Bin};
  465. _ -> Info#{readme => <<>>}
  466. end;
  467. plugins_readme(_NameVsn, _Options, Info) ->
  468. Info.
  469. plugin_status(NameVsn, Info) ->
  470. {ok, AppName, _AppVsn} = parse_name_vsn(NameVsn),
  471. RunningSt =
  472. case application:get_key(AppName, vsn) of
  473. {ok, _} ->
  474. case lists:keyfind(AppName, 1, running_apps()) of
  475. {AppName, _} -> running;
  476. _ -> loaded
  477. end;
  478. undefined ->
  479. stopped
  480. end,
  481. Configured = lists:filtermap(
  482. fun(#{name_vsn := Nv, enable := St}) ->
  483. case bin(Nv) =:= bin(NameVsn) of
  484. true -> {true, St};
  485. false -> false
  486. end
  487. end,
  488. configured()
  489. ),
  490. ConfSt =
  491. case Configured of
  492. [] -> not_configured;
  493. [true] -> enabled;
  494. [false] -> disabled
  495. end,
  496. Info#{
  497. running_status => RunningSt,
  498. config_status => ConfSt
  499. }.
  500. bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
  501. bin(L) when is_list(L) -> unicode:characters_to_binary(L, utf8);
  502. bin(B) when is_binary(B) -> B.
  503. check_plugin(
  504. #{
  505. <<"name">> := Name,
  506. <<"rel_vsn">> := Vsn,
  507. <<"rel_apps">> := Apps,
  508. <<"description">> := _
  509. } = Info,
  510. NameVsn,
  511. File
  512. ) ->
  513. case bin(NameVsn) =:= bin([Name, "-", Vsn]) of
  514. true ->
  515. try
  516. %% assert
  517. [_ | _] = Apps,
  518. %% validate if the list is all <app>-<vsn> strings
  519. lists:foreach(fun(App) -> {ok, _, _} = parse_name_vsn(App) end, Apps)
  520. catch
  521. _:_ ->
  522. throw(#{
  523. error => "bad_rel_apps",
  524. rel_apps => Apps,
  525. hint => "A non-empty string list of app_name-app_vsn format"
  526. })
  527. end,
  528. Info;
  529. false ->
  530. throw(#{
  531. error => "name_vsn_mismatch",
  532. name_vsn => NameVsn,
  533. path => File,
  534. name => Name,
  535. rel_vsn => Vsn
  536. })
  537. end;
  538. check_plugin(_What, NameVsn, File) ->
  539. throw(#{
  540. error => "bad_info_file_content",
  541. mandatory_fields => [rel_vsn, name, rel_apps, description],
  542. name_vsn => NameVsn,
  543. path => File
  544. }).
  545. load_code_start_apps(RelNameVsn, #{<<"rel_apps">> := Apps}) ->
  546. LibDir = filename:join([install_dir(), RelNameVsn]),
  547. RunningApps = running_apps(),
  548. %% load plugin apps and beam code
  549. AppNames =
  550. lists:map(
  551. fun(AppNameVsn) ->
  552. {ok, AppName, AppVsn} = parse_name_vsn(AppNameVsn),
  553. EbinDir = filename:join([LibDir, AppNameVsn, "ebin"]),
  554. ok = load_plugin_app(AppName, AppVsn, EbinDir, RunningApps),
  555. AppName
  556. end,
  557. Apps
  558. ),
  559. lists:foreach(fun start_app/1, AppNames).
  560. load_plugin_app(AppName, AppVsn, Ebin, RunningApps) ->
  561. case lists:keyfind(AppName, 1, RunningApps) of
  562. false ->
  563. do_load_plugin_app(AppName, Ebin);
  564. {_, Vsn} ->
  565. case bin(Vsn) =:= bin(AppVsn) of
  566. true ->
  567. %% already started on the exact version
  568. ok;
  569. false ->
  570. %% running but a different version
  571. ?SLOG(warning, #{
  572. msg => "plugin_app_already_running",
  573. name => AppName,
  574. running_vsn => Vsn,
  575. loading_vsn => AppVsn
  576. })
  577. end
  578. end.
  579. do_load_plugin_app(AppName, Ebin) when is_binary(Ebin) ->
  580. do_load_plugin_app(AppName, binary_to_list(Ebin));
  581. do_load_plugin_app(AppName, Ebin) ->
  582. _ = code:add_patha(Ebin),
  583. Modules = filelib:wildcard(filename:join([Ebin, "*.beam"])),
  584. lists:foreach(
  585. fun(BeamFile) ->
  586. Module = list_to_atom(filename:basename(BeamFile, ".beam")),
  587. case code:load_file(Module) of
  588. {module, _} ->
  589. ok;
  590. {error, Reason} ->
  591. throw(#{
  592. error => "failed_to_load_plugin_beam",
  593. path => BeamFile,
  594. reason => Reason
  595. })
  596. end
  597. end,
  598. Modules
  599. ),
  600. case application:load(AppName) of
  601. ok ->
  602. ok;
  603. {error, {already_loaded, _}} ->
  604. ok;
  605. {error, Reason} ->
  606. throw(#{
  607. error => "failed_to_load_plugin_app",
  608. name => AppName,
  609. reason => Reason
  610. })
  611. end.
  612. start_app(App) ->
  613. case application:ensure_all_started(App) of
  614. {ok, Started} ->
  615. case Started =/= [] of
  616. true -> ?SLOG(debug, #{msg => "started_plugin_apps", apps => Started});
  617. false -> ok
  618. end,
  619. ?SLOG(debug, #{msg => "started_plugin_app", app => App}),
  620. ok;
  621. {error, {ErrApp, Reason}} ->
  622. throw(#{
  623. error => "failed_to_start_plugin_app",
  624. app => App,
  625. err_app => ErrApp,
  626. reason => Reason
  627. })
  628. end.
  629. %% Stop all apps installed by the plugin package,
  630. %% but not the ones shared with others.
  631. ensure_apps_stopped(#{<<"rel_apps">> := Apps}) ->
  632. %% load plugin apps and beam code
  633. AppsToStop =
  634. lists:map(
  635. fun(NameVsn) ->
  636. {ok, AppName, _AppVsn} = parse_name_vsn(NameVsn),
  637. AppName
  638. end,
  639. Apps
  640. ),
  641. case tryit("stop_apps", fun() -> stop_apps(AppsToStop) end) of
  642. {ok, []} ->
  643. %% all apps stopped
  644. ok;
  645. {ok, Left} ->
  646. ?SLOG(warning, #{
  647. msg => "unabled_to_stop_plugin_apps",
  648. apps => Left,
  649. reason => "running_apps_still_depends_on_this_apps"
  650. }),
  651. ok;
  652. {error, Reason} ->
  653. {error, Reason}
  654. end.
  655. stop_apps(Apps) ->
  656. RunningApps = running_apps(),
  657. case do_stop_apps(Apps, [], RunningApps) of
  658. %% all stopped
  659. {ok, []} -> {ok, []};
  660. %% no progress
  661. {ok, Remain} when Remain =:= Apps -> {ok, Apps};
  662. %% try again
  663. {ok, Remain} -> stop_apps(Remain)
  664. end.
  665. do_stop_apps([], Remain, _AllApps) ->
  666. {ok, lists:reverse(Remain)};
  667. do_stop_apps([App | Apps], Remain, RunningApps) ->
  668. case is_needed_by_any(App, RunningApps) of
  669. true ->
  670. do_stop_apps(Apps, [App | Remain], RunningApps);
  671. false ->
  672. ok = stop_app(App),
  673. do_stop_apps(Apps, Remain, RunningApps)
  674. end.
  675. stop_app(App) ->
  676. case application:stop(App) of
  677. ok ->
  678. ?SLOG(debug, #{msg => "stop_plugin_successfully", app => App}),
  679. ok = unload_moudle_and_app(App);
  680. {error, {not_started, App}} ->
  681. ?SLOG(debug, #{msg => "plugin_not_started", app => App}),
  682. ok = unload_moudle_and_app(App);
  683. {error, Reason} ->
  684. throw(#{error => "failed_to_stop_app", app => App, reason => Reason})
  685. end.
  686. unload_moudle_and_app(App) ->
  687. case application:get_key(App, modules) of
  688. {ok, Modules} -> lists:foreach(fun code:soft_purge/1, Modules);
  689. _ -> ok
  690. end,
  691. _ = application:unload(App),
  692. ok.
  693. is_needed_by_any(AppToStop, RunningApps) ->
  694. lists:any(
  695. fun({RunningApp, _RunningAppVsn}) ->
  696. is_needed_by(AppToStop, RunningApp)
  697. end,
  698. RunningApps
  699. ).
  700. is_needed_by(AppToStop, AppToStop) ->
  701. false;
  702. is_needed_by(AppToStop, RunningApp) ->
  703. case application:get_key(RunningApp, applications) of
  704. {ok, Deps} -> lists:member(AppToStop, Deps);
  705. undefined -> false
  706. end.
  707. put_config(Key, Value) when is_atom(Key) ->
  708. put_config([Key], Value);
  709. put_config(Path, Values) when is_list(Path) ->
  710. Opts = #{rawconf_with_defaults => true, override_to => cluster},
  711. %% Already in cluster_rpc, don't use emqx_conf:update, dead calls
  712. case emqx:update_config([?CONF_ROOT | Path], bin_key(Values), Opts) of
  713. {ok, _} -> ok;
  714. Error -> Error
  715. end.
  716. bin_key(Map) when is_map(Map) ->
  717. maps:fold(fun(K, V, Acc) -> Acc#{bin(K) => V} end, #{}, Map);
  718. bin_key(List = [#{} | _]) ->
  719. lists:map(fun(M) -> bin_key(M) end, List);
  720. bin_key(Term) ->
  721. Term.
  722. get_config(Key, Default) when is_atom(Key) ->
  723. get_config([Key], Default);
  724. get_config(Path, Default) ->
  725. emqx_conf:get([?CONF_ROOT | Path], Default).
  726. install_dir() -> get_config(install_dir, "").
  727. put_configured(Configured) ->
  728. ok = put_config(states, bin_key(Configured)).
  729. configured() ->
  730. get_config(states, []).
  731. for_plugins(ActionFun) ->
  732. case lists:flatmap(fun(I) -> for_plugin(I, ActionFun) end, configured()) of
  733. [] -> ok;
  734. Errors -> erlang:error(#{function => ActionFun, errors => Errors})
  735. end.
  736. for_plugin(#{name_vsn := NameVsn, enable := true}, Fun) ->
  737. case Fun(NameVsn) of
  738. ok -> [];
  739. {error, Reason} -> [{NameVsn, Reason}]
  740. end;
  741. for_plugin(#{name_vsn := NameVsn, enable := false}, _Fun) ->
  742. ?SLOG(debug, #{
  743. msg => "plugin_disabled",
  744. name_vsn => NameVsn
  745. }),
  746. [].
  747. parse_name_vsn(NameVsn) when is_binary(NameVsn) ->
  748. parse_name_vsn(binary_to_list(NameVsn));
  749. parse_name_vsn(NameVsn) when is_list(NameVsn) ->
  750. case lists:splitwith(fun(X) -> X =/= $- end, NameVsn) of
  751. {AppName, [$- | Vsn]} -> {ok, list_to_atom(AppName), Vsn};
  752. _ -> {error, "bad_name_vsn"}
  753. end.
  754. pkg_file(NameVsn) ->
  755. filename:join([install_dir(), bin([NameVsn, ".tar.gz"])]).
  756. dir(NameVsn) ->
  757. filename:join([install_dir(), NameVsn]).
  758. info_file(NameVsn) ->
  759. filename:join([dir(NameVsn), "release.json"]).
  760. readme_file(NameVsn) ->
  761. filename:join([dir(NameVsn), "README.md"]).
  762. running_apps() ->
  763. lists:map(
  764. fun({N, _, V}) ->
  765. {N, V}
  766. end,
  767. application:which_applications(infinity)
  768. ).