emqx_plugins.erl 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2017-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_plugins).
  17. -include_lib("emqx/include/logger.hrl").
  18. -include_lib("emqx/include/logger.hrl").
  19. -include("emqx_plugins.hrl").
  20. -ifdef(TEST).
  21. -include_lib("eunit/include/eunit.hrl").
  22. -endif.
  23. -export([
  24. ensure_installed/1,
  25. ensure_uninstalled/1,
  26. ensure_enabled/1,
  27. ensure_enabled/2,
  28. ensure_disabled/1,
  29. purge/1,
  30. delete_package/1
  31. ]).
  32. -export([
  33. ensure_started/0,
  34. ensure_started/1,
  35. ensure_stopped/0,
  36. ensure_stopped/1,
  37. restart/1,
  38. list/0,
  39. describe/1,
  40. parse_name_vsn/1
  41. ]).
  42. -export([
  43. get_config/2,
  44. put_config/2,
  45. get_tar/1
  46. ]).
  47. %% internal
  48. -export([do_ensure_started/1]).
  49. -export([
  50. install_dir/0
  51. ]).
  52. -ifdef(TEST).
  53. -compile(export_all).
  54. -compile(nowarn_export_all).
  55. -endif.
  56. %% "my_plugin-0.1.0"
  57. -type name_vsn() :: binary() | string().
  58. %% the parse result of the JSON info file
  59. -type plugin() :: map().
  60. -type position() :: no_move | front | rear | {before, name_vsn()} | {behind, name_vsn()}.
  61. %%--------------------------------------------------------------------
  62. %% APIs
  63. %%--------------------------------------------------------------------
  64. %% @doc Describe a plugin.
  65. -spec describe(name_vsn()) -> {ok, plugin()} | {error, any()}.
  66. describe(NameVsn) -> read_plugin(NameVsn, #{fill_readme => true}).
  67. %% @doc Install a .tar.gz package placed in install_dir.
  68. -spec ensure_installed(name_vsn()) -> ok | {error, any()}.
  69. ensure_installed(NameVsn) ->
  70. case read_plugin(NameVsn, #{}) of
  71. {ok, _} ->
  72. ok;
  73. {error, _} ->
  74. ok = purge(NameVsn),
  75. do_ensure_installed(NameVsn)
  76. end.
  77. do_ensure_installed(NameVsn) ->
  78. TarGz = pkg_file(NameVsn),
  79. case erl_tar:extract(TarGz, [compressed, memory]) of
  80. {ok, TarContent} ->
  81. ok = write_tar_file_content(install_dir(), TarContent),
  82. case read_plugin(NameVsn, #{}) of
  83. {ok, _} ->
  84. ok;
  85. {error, Reason} ->
  86. ?SLOG(warning, Reason#{msg => "failed_to_read_after_install"}),
  87. ok = delete_tar_file_content(install_dir(), TarContent),
  88. {error, Reason}
  89. end;
  90. {error, {_, enoent}} ->
  91. {error, #{
  92. reason => "failed_to_extract_plugin_package",
  93. path => TarGz,
  94. return => not_found
  95. }};
  96. {error, Reason} ->
  97. {error, #{
  98. reason => "bad_plugin_package",
  99. path => TarGz,
  100. return => Reason
  101. }}
  102. end.
  103. -spec get_tar(name_vsn()) -> {ok, binary()} | {error, any}.
  104. get_tar(NameVsn) ->
  105. TarGz = pkg_file(NameVsn),
  106. case file:read_file(TarGz) of
  107. {ok, Content} ->
  108. {ok, Content};
  109. {error, _} ->
  110. case maybe_create_tar(NameVsn, TarGz, install_dir()) of
  111. ok ->
  112. file:read_file(TarGz);
  113. Err ->
  114. Err
  115. end
  116. end.
  117. maybe_create_tar(NameVsn, TarGzName, InstallDir) when is_binary(InstallDir) ->
  118. maybe_create_tar(NameVsn, TarGzName, binary_to_list(InstallDir));
  119. maybe_create_tar(NameVsn, TarGzName, InstallDir) ->
  120. case filelib:wildcard(filename:join(dir(NameVsn), "**")) of
  121. [_ | _] = PluginFiles ->
  122. InstallDir1 = string:trim(InstallDir, trailing, "/") ++ "/",
  123. PluginFiles1 = [{string:prefix(F, InstallDir1), F} || F <- PluginFiles],
  124. erl_tar:create(TarGzName, PluginFiles1, [compressed]);
  125. _ ->
  126. {error, plugin_not_found}
  127. end.
  128. write_tar_file_content(BaseDir, TarContent) ->
  129. lists:foreach(
  130. fun({Name, Bin}) ->
  131. Filename = filename:join(BaseDir, Name),
  132. ok = filelib:ensure_dir(Filename),
  133. ok = file:write_file(Filename, Bin)
  134. end,
  135. TarContent
  136. ).
  137. delete_tar_file_content(BaseDir, TarContent) ->
  138. lists:foreach(
  139. fun({Name, _}) ->
  140. Filename = filename:join(BaseDir, Name),
  141. case filelib:is_file(Filename) of
  142. true ->
  143. TopDirOrFile = top_dir(BaseDir, Filename),
  144. ok = file:del_dir_r(TopDirOrFile);
  145. false ->
  146. %% probably already deleted
  147. ok
  148. end
  149. end,
  150. TarContent
  151. ).
  152. top_dir(BaseDir0, DirOrFile) ->
  153. BaseDir = normalize_dir(BaseDir0),
  154. case filename:dirname(DirOrFile) of
  155. RockBottom when RockBottom =:= "/" orelse RockBottom =:= "." ->
  156. throw({out_of_bounds, DirOrFile});
  157. BaseDir ->
  158. DirOrFile;
  159. Parent ->
  160. top_dir(BaseDir, Parent)
  161. end.
  162. normalize_dir(Dir) ->
  163. %% Get rid of possible trailing slash
  164. filename:join([Dir, ""]).
  165. -ifdef(TEST).
  166. normalize_dir_test_() ->
  167. [
  168. ?_assertEqual("foo", normalize_dir("foo")),
  169. ?_assertEqual("foo", normalize_dir("foo/")),
  170. ?_assertEqual("/foo", normalize_dir("/foo")),
  171. ?_assertEqual("/foo", normalize_dir("/foo/"))
  172. ].
  173. top_dir_test_() ->
  174. [
  175. ?_assertEqual("base/foo", top_dir("base", filename:join(["base", "foo", "bar"]))),
  176. ?_assertEqual("/base/foo", top_dir("/base", filename:join(["/", "base", "foo", "bar"]))),
  177. ?_assertEqual("/base/foo", top_dir("/base/", filename:join(["/", "base", "foo", "bar"]))),
  178. ?_assertThrow({out_of_bounds, _}, top_dir("/base", filename:join(["/", "base"]))),
  179. ?_assertThrow({out_of_bounds, _}, top_dir("/base", filename:join(["/", "foo", "bar"])))
  180. ].
  181. -endif.
  182. %% @doc Ensure files and directories for the given plugin are being deleted.
  183. %% If a plugin is running, or enabled, an error is returned.
  184. -spec ensure_uninstalled(name_vsn()) -> ok | {error, any()}.
  185. ensure_uninstalled(NameVsn) ->
  186. case read_plugin(NameVsn, #{}) of
  187. {ok, #{running_status := RunningSt}} when RunningSt =/= stopped ->
  188. {error, #{
  189. reason => "bad_plugin_running_status",
  190. hint => "stop_the_plugin_first"
  191. }};
  192. {ok, #{config_status := enabled}} ->
  193. {error, #{
  194. reason => "bad_plugin_config_status",
  195. hint => "disable_the_plugin_first"
  196. }};
  197. _ ->
  198. purge(NameVsn),
  199. ensure_delete(NameVsn)
  200. end.
  201. ensure_delete(NameVsn0) ->
  202. NameVsn = bin(NameVsn0),
  203. List = configured(),
  204. put_configured(lists:filter(fun(#{name_vsn := N1}) -> bin(N1) =/= NameVsn end, List)),
  205. ok.
  206. %% @doc Ensure a plugin is enabled to the end of the plugins list.
  207. -spec ensure_enabled(name_vsn()) -> ok | {error, any()}.
  208. ensure_enabled(NameVsn) ->
  209. ensure_enabled(NameVsn, no_move).
  210. %% @doc Ensure a plugin is enabled at the given position of the plugin list.
  211. -spec ensure_enabled(name_vsn(), position()) -> ok | {error, any()}.
  212. ensure_enabled(NameVsn, Position) ->
  213. ensure_state(NameVsn, Position, true).
  214. %% @doc Ensure a plugin is disabled.
  215. -spec ensure_disabled(name_vsn()) -> ok | {error, any()}.
  216. ensure_disabled(NameVsn) ->
  217. ensure_state(NameVsn, no_move, false).
  218. ensure_state(NameVsn, Position, State) when is_binary(NameVsn) ->
  219. ensure_state(binary_to_list(NameVsn), Position, State);
  220. ensure_state(NameVsn, Position, State) ->
  221. case read_plugin(NameVsn, #{}) of
  222. {ok, _} ->
  223. Item = #{
  224. name_vsn => NameVsn,
  225. enable => State
  226. },
  227. tryit("ensure_state", fun() -> ensure_configured(Item, Position) end);
  228. {error, Reason} ->
  229. {error, Reason}
  230. end.
  231. ensure_configured(#{name_vsn := NameVsn} = Item, Position) ->
  232. Configured = configured(),
  233. SplitFun = fun(#{name_vsn := Nv}) -> bin(Nv) =/= bin(NameVsn) end,
  234. {Front, Rear} = lists:splitwith(SplitFun, Configured),
  235. NewConfigured =
  236. case Rear of
  237. [_ | More] when Position =:= no_move ->
  238. Front ++ [Item | More];
  239. [_ | More] ->
  240. add_new_configured(Front ++ More, Position, Item);
  241. [] ->
  242. add_new_configured(Configured, Position, Item)
  243. end,
  244. ok = put_configured(NewConfigured).
  245. add_new_configured(Configured, no_move, Item) ->
  246. %% default to rear
  247. add_new_configured(Configured, rear, Item);
  248. add_new_configured(Configured, front, Item) ->
  249. [Item | Configured];
  250. add_new_configured(Configured, rear, Item) ->
  251. Configured ++ [Item];
  252. add_new_configured(Configured, {Action, NameVsn}, Item) ->
  253. SplitFun = fun(#{name_vsn := Nv}) -> bin(Nv) =/= bin(NameVsn) end,
  254. {Front, Rear} = lists:splitwith(SplitFun, Configured),
  255. Rear =:= [] andalso
  256. throw(#{
  257. error => "position_anchor_plugin_not_configured",
  258. hint => "maybe_install_and_configure",
  259. name_vsn => NameVsn
  260. }),
  261. case Action of
  262. before ->
  263. Front ++ [Item | Rear];
  264. behind ->
  265. [Anchor | Rear0] = Rear,
  266. Front ++ [Anchor, Item | Rear0]
  267. end.
  268. %% @doc Delete the package file.
  269. -spec delete_package(name_vsn()) -> ok.
  270. delete_package(NameVsn) ->
  271. File = pkg_file(NameVsn),
  272. case file:delete(File) of
  273. ok ->
  274. ?SLOG(info, #{msg => "purged_plugin_dir", path => File}),
  275. ok;
  276. {error, enoent} ->
  277. ok;
  278. {error, Reason} ->
  279. ?SLOG(error, #{
  280. msg => "failed_to_delete_package_file",
  281. path => File,
  282. reason => Reason
  283. }),
  284. {error, Reason}
  285. end.
  286. %% @doc Delete extracted dir
  287. %% In case one lib is shared by multiple plugins.
  288. %% it might be the case that purging one plugin's install dir
  289. %% will cause deletion of loaded beams.
  290. %% It should not be a problem, because shared lib should
  291. %% reside in all the plugin install dirs.
  292. -spec purge(name_vsn()) -> ok.
  293. purge(NameVsn) ->
  294. Dir = dir(NameVsn),
  295. case file:del_dir_r(Dir) of
  296. ok ->
  297. ?SLOG(info, #{msg => "purged_plugin_dir", dir => Dir});
  298. {error, enoent} ->
  299. ok;
  300. {error, Reason} ->
  301. ?SLOG(error, #{
  302. msg => "failed_to_purge_plugin_dir",
  303. dir => Dir,
  304. reason => Reason
  305. }),
  306. {error, Reason}
  307. end.
  308. %% @doc Start all configured plugins are started.
  309. -spec ensure_started() -> ok.
  310. ensure_started() ->
  311. ok = for_plugins(fun ?MODULE:do_ensure_started/1).
  312. %% @doc Start a plugin from Management API or CLI.
  313. %% the input is a <name>-<vsn> string.
  314. -spec ensure_started(name_vsn()) -> ok | {error, term()}.
  315. ensure_started(NameVsn) ->
  316. case do_ensure_started(NameVsn) of
  317. ok ->
  318. ok;
  319. {error, Reason} ->
  320. ?SLOG(alert, #{
  321. msg => "failed_to_start_plugin",
  322. reason => Reason
  323. }),
  324. {error, Reason}
  325. end.
  326. %% @doc Stop all plugins before broker stops.
  327. -spec ensure_stopped() -> ok.
  328. ensure_stopped() ->
  329. for_plugins(fun ?MODULE:ensure_stopped/1).
  330. %% @doc Stop a plugin from Management API or CLI.
  331. -spec ensure_stopped(name_vsn()) -> ok | {error, term()}.
  332. ensure_stopped(NameVsn) ->
  333. tryit(
  334. "stop_plugin",
  335. fun() ->
  336. Plugin = do_read_plugin(NameVsn),
  337. ensure_apps_stopped(Plugin)
  338. end
  339. ).
  340. %% @doc Stop and then start the plugin.
  341. restart(NameVsn) ->
  342. case ensure_stopped(NameVsn) of
  343. ok -> ensure_started(NameVsn);
  344. {error, Reason} -> {error, Reason}
  345. end.
  346. %% @doc List all installed plugins.
  347. %% Including the ones that are installed, but not enabled in config.
  348. -spec list() -> [plugin()].
  349. list() ->
  350. Pattern = filename:join([install_dir(), "*", "release.json"]),
  351. All = lists:filtermap(
  352. fun(JsonFile) ->
  353. case read_plugin({file, JsonFile}, #{}) of
  354. {ok, Info} ->
  355. {true, Info};
  356. {error, Reason} ->
  357. ?SLOG(warning, Reason),
  358. false
  359. end
  360. end,
  361. filelib:wildcard(Pattern)
  362. ),
  363. list(configured(), All).
  364. %% Make sure configured ones are ordered in front.
  365. list([], All) ->
  366. All;
  367. list([#{name_vsn := NameVsn} | Rest], All) ->
  368. SplitF = fun(#{<<"name">> := Name, <<"rel_vsn">> := Vsn}) ->
  369. bin([Name, "-", Vsn]) =/= bin(NameVsn)
  370. end,
  371. case lists:splitwith(SplitF, All) of
  372. {_, []} ->
  373. ?SLOG(warning, #{
  374. msg => "configured_plugin_not_installed",
  375. name_vsn => NameVsn
  376. }),
  377. list(Rest, All);
  378. {Front, [I | Rear]} ->
  379. [I | list(Rest, Front ++ Rear)]
  380. end.
  381. do_ensure_started(NameVsn) ->
  382. tryit(
  383. "start_plugins",
  384. fun() ->
  385. ok = ensure_exists_and_installed(NameVsn),
  386. Plugin = do_read_plugin(NameVsn),
  387. ok = load_code_start_apps(NameVsn, Plugin)
  388. end
  389. ).
  390. %% try the function, catch 'throw' exceptions as normal 'error' return
  391. %% other exceptions with stacktrace logged.
  392. tryit(WhichOp, F) ->
  393. try
  394. F()
  395. catch
  396. throw:Reason ->
  397. %% thrown exceptions are known errors
  398. %% translate to a return value without stacktrace
  399. {error, Reason};
  400. error:Reason:Stacktrace ->
  401. %% unexpected errors, log stacktrace
  402. ?SLOG(warning, #{
  403. msg => "plugin_op_failed",
  404. which_op => WhichOp,
  405. exception => Reason,
  406. stacktrace => Stacktrace
  407. }),
  408. {error, {failed, WhichOp}}
  409. end.
  410. %% read plugin info from the JSON file
  411. %% returns {ok, Info} or {error, Reason}
  412. read_plugin(NameVsn, Options) ->
  413. tryit(
  414. "read_plugin_info",
  415. fun() -> {ok, do_read_plugin(NameVsn, Options)} end
  416. ).
  417. do_read_plugin(Plugin) -> do_read_plugin(Plugin, #{}).
  418. do_read_plugin({file, InfoFile}, Options) ->
  419. [_, NameVsn | _] = lists:reverse(filename:split(InfoFile)),
  420. case hocon:load(InfoFile, #{format => richmap}) of
  421. {ok, RichMap} ->
  422. Info0 = check_plugin(hocon_maps:ensure_plain(RichMap), NameVsn, InfoFile),
  423. Info1 = plugins_readme(NameVsn, Options, Info0),
  424. plugin_status(NameVsn, Info1);
  425. {error, Reason} ->
  426. throw(#{
  427. error => "bad_info_file",
  428. path => InfoFile,
  429. return => Reason
  430. })
  431. end;
  432. do_read_plugin(NameVsn, Options) ->
  433. do_read_plugin({file, info_file(NameVsn)}, Options).
  434. ensure_exists_and_installed(NameVsn) ->
  435. case filelib:is_dir(dir(NameVsn)) of
  436. true ->
  437. ok;
  438. false ->
  439. %% Do we have the package, but it's not extracted yet?
  440. case get_tar(NameVsn) of
  441. {ok, TarContent} ->
  442. ok = file:write_file(pkg_file(NameVsn), TarContent),
  443. ok = do_ensure_installed(NameVsn);
  444. _ ->
  445. %% If not, try to get it from the cluster.
  446. do_get_from_cluster(NameVsn)
  447. end
  448. end.
  449. do_get_from_cluster(NameVsn) ->
  450. Nodes = [N || N <- mria:running_nodes(), N /= node()],
  451. case get_from_any_node(Nodes, NameVsn, []) of
  452. {ok, TarContent} ->
  453. ok = file:write_file(pkg_file(NameVsn), TarContent),
  454. ok = do_ensure_installed(NameVsn);
  455. {error, NodeErrors} when Nodes =/= [] ->
  456. ?SLOG(error, #{
  457. msg => "failed_to_copy_plugin_from_other_nodes",
  458. name_vsn => NameVsn,
  459. node_errors => NodeErrors
  460. }),
  461. {error, plugin_not_found};
  462. {error, _} ->
  463. ?SLOG(error, #{
  464. msg => "no_nodes_to_copy_plugin_from",
  465. name_vsn => NameVsn
  466. }),
  467. {error, plugin_not_found}
  468. end.
  469. get_from_any_node([], _NameVsn, Errors) ->
  470. {error, Errors};
  471. get_from_any_node([Node | T], NameVsn, Errors) ->
  472. case emqx_plugins_proto_v1:get_tar(Node, NameVsn, infinity) of
  473. {ok, _} = Res ->
  474. Res;
  475. Err ->
  476. get_from_any_node(T, NameVsn, [{Node, Err} | Errors])
  477. end.
  478. plugins_readme(NameVsn, #{fill_readme := true}, Info) ->
  479. case file:read_file(readme_file(NameVsn)) of
  480. {ok, Bin} -> Info#{readme => Bin};
  481. _ -> Info#{readme => <<>>}
  482. end;
  483. plugins_readme(_NameVsn, _Options, Info) ->
  484. Info.
  485. plugin_status(NameVsn, Info) ->
  486. {ok, AppName, _AppVsn} = parse_name_vsn(NameVsn),
  487. RunningSt =
  488. case application:get_key(AppName, vsn) of
  489. {ok, _} ->
  490. case lists:keyfind(AppName, 1, running_apps()) of
  491. {AppName, _} -> running;
  492. _ -> loaded
  493. end;
  494. undefined ->
  495. stopped
  496. end,
  497. Configured = lists:filtermap(
  498. fun(#{name_vsn := Nv, enable := St}) ->
  499. case bin(Nv) =:= bin(NameVsn) of
  500. true -> {true, St};
  501. false -> false
  502. end
  503. end,
  504. configured()
  505. ),
  506. ConfSt =
  507. case Configured of
  508. [] -> not_configured;
  509. [true] -> enabled;
  510. [false] -> disabled
  511. end,
  512. Info#{
  513. running_status => RunningSt,
  514. config_status => ConfSt
  515. }.
  516. bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
  517. bin(L) when is_list(L) -> unicode:characters_to_binary(L, utf8);
  518. bin(B) when is_binary(B) -> B.
  519. check_plugin(
  520. #{
  521. <<"name">> := Name,
  522. <<"rel_vsn">> := Vsn,
  523. <<"rel_apps">> := Apps,
  524. <<"description">> := _
  525. } = Info,
  526. NameVsn,
  527. File
  528. ) ->
  529. case bin(NameVsn) =:= bin([Name, "-", Vsn]) of
  530. true ->
  531. try
  532. %% assert
  533. [_ | _] = Apps,
  534. %% validate if the list is all <app>-<vsn> strings
  535. lists:foreach(fun(App) -> {ok, _, _} = parse_name_vsn(App) end, Apps)
  536. catch
  537. _:_ ->
  538. throw(#{
  539. error => "bad_rel_apps",
  540. rel_apps => Apps,
  541. hint => "A non-empty string list of app_name-app_vsn format"
  542. })
  543. end,
  544. Info;
  545. false ->
  546. throw(#{
  547. error => "name_vsn_mismatch",
  548. name_vsn => NameVsn,
  549. path => File,
  550. name => Name,
  551. rel_vsn => Vsn
  552. })
  553. end;
  554. check_plugin(_What, NameVsn, File) ->
  555. throw(#{
  556. error => "bad_info_file_content",
  557. mandatory_fields => [rel_vsn, name, rel_apps, description],
  558. name_vsn => NameVsn,
  559. path => File
  560. }).
  561. load_code_start_apps(RelNameVsn, #{<<"rel_apps">> := Apps}) ->
  562. LibDir = filename:join([install_dir(), RelNameVsn]),
  563. RunningApps = running_apps(),
  564. %% load plugin apps and beam code
  565. AppNames =
  566. lists:map(
  567. fun(AppNameVsn) ->
  568. {ok, AppName, AppVsn} = parse_name_vsn(AppNameVsn),
  569. EbinDir = filename:join([LibDir, AppNameVsn, "ebin"]),
  570. ok = load_plugin_app(AppName, AppVsn, EbinDir, RunningApps),
  571. AppName
  572. end,
  573. Apps
  574. ),
  575. lists:foreach(fun start_app/1, AppNames).
  576. load_plugin_app(AppName, AppVsn, Ebin, RunningApps) ->
  577. case lists:keyfind(AppName, 1, RunningApps) of
  578. false ->
  579. do_load_plugin_app(AppName, Ebin);
  580. {_, Vsn} ->
  581. case bin(Vsn) =:= bin(AppVsn) of
  582. true ->
  583. %% already started on the exact version
  584. ok;
  585. false ->
  586. %% running but a different version
  587. ?SLOG(warning, #{
  588. msg => "plugin_app_already_running",
  589. name => AppName,
  590. running_vsn => Vsn,
  591. loading_vsn => AppVsn
  592. })
  593. end
  594. end.
  595. do_load_plugin_app(AppName, Ebin) when is_binary(Ebin) ->
  596. do_load_plugin_app(AppName, binary_to_list(Ebin));
  597. do_load_plugin_app(AppName, Ebin) ->
  598. _ = code:add_patha(Ebin),
  599. Modules = filelib:wildcard(filename:join([Ebin, "*.beam"])),
  600. lists:foreach(
  601. fun(BeamFile) ->
  602. Module = list_to_atom(filename:basename(BeamFile, ".beam")),
  603. case code:load_file(Module) of
  604. {module, _} ->
  605. ok;
  606. {error, Reason} ->
  607. throw(#{
  608. error => "failed_to_load_plugin_beam",
  609. path => BeamFile,
  610. reason => Reason
  611. })
  612. end
  613. end,
  614. Modules
  615. ),
  616. case application:load(AppName) of
  617. ok ->
  618. ok;
  619. {error, {already_loaded, _}} ->
  620. ok;
  621. {error, Reason} ->
  622. throw(#{
  623. error => "failed_to_load_plugin_app",
  624. name => AppName,
  625. reason => Reason
  626. })
  627. end.
  628. start_app(App) ->
  629. case application:ensure_all_started(App) of
  630. {ok, Started} ->
  631. case Started =/= [] of
  632. true -> ?SLOG(debug, #{msg => "started_plugin_apps", apps => Started});
  633. false -> ok
  634. end,
  635. ?SLOG(debug, #{msg => "started_plugin_app", app => App}),
  636. ok;
  637. {error, {ErrApp, Reason}} ->
  638. throw(#{
  639. error => "failed_to_start_plugin_app",
  640. app => App,
  641. err_app => ErrApp,
  642. reason => Reason
  643. })
  644. end.
  645. %% Stop all apps installed by the plugin package,
  646. %% but not the ones shared with others.
  647. ensure_apps_stopped(#{<<"rel_apps">> := Apps}) ->
  648. %% load plugin apps and beam code
  649. AppsToStop =
  650. lists:map(
  651. fun(NameVsn) ->
  652. {ok, AppName, _AppVsn} = parse_name_vsn(NameVsn),
  653. AppName
  654. end,
  655. Apps
  656. ),
  657. case tryit("stop_apps", fun() -> stop_apps(AppsToStop) end) of
  658. {ok, []} ->
  659. %% all apps stopped
  660. ok;
  661. {ok, Left} ->
  662. ?SLOG(warning, #{
  663. msg => "unabled_to_stop_plugin_apps",
  664. apps => Left,
  665. reason => "running_apps_still_depends_on_this_apps"
  666. }),
  667. ok;
  668. {error, Reason} ->
  669. {error, Reason}
  670. end.
  671. stop_apps(Apps) ->
  672. RunningApps = running_apps(),
  673. case do_stop_apps(Apps, [], RunningApps) of
  674. %% all stopped
  675. {ok, []} -> {ok, []};
  676. %% no progress
  677. {ok, Remain} when Remain =:= Apps -> {ok, Apps};
  678. %% try again
  679. {ok, Remain} -> stop_apps(Remain)
  680. end.
  681. do_stop_apps([], Remain, _AllApps) ->
  682. {ok, lists:reverse(Remain)};
  683. do_stop_apps([App | Apps], Remain, RunningApps) ->
  684. case is_needed_by_any(App, RunningApps) of
  685. true ->
  686. do_stop_apps(Apps, [App | Remain], RunningApps);
  687. false ->
  688. ok = stop_app(App),
  689. do_stop_apps(Apps, Remain, RunningApps)
  690. end.
  691. stop_app(App) ->
  692. case application:stop(App) of
  693. ok ->
  694. ?SLOG(debug, #{msg => "stop_plugin_successfully", app => App}),
  695. ok = unload_moudle_and_app(App);
  696. {error, {not_started, App}} ->
  697. ?SLOG(debug, #{msg => "plugin_not_started", app => App}),
  698. ok = unload_moudle_and_app(App);
  699. {error, Reason} ->
  700. throw(#{error => "failed_to_stop_app", app => App, reason => Reason})
  701. end.
  702. unload_moudle_and_app(App) ->
  703. case application:get_key(App, modules) of
  704. {ok, Modules} -> lists:foreach(fun code:soft_purge/1, Modules);
  705. _ -> ok
  706. end,
  707. _ = application:unload(App),
  708. ok.
  709. is_needed_by_any(AppToStop, RunningApps) ->
  710. lists:any(
  711. fun({RunningApp, _RunningAppVsn}) ->
  712. is_needed_by(AppToStop, RunningApp)
  713. end,
  714. RunningApps
  715. ).
  716. is_needed_by(AppToStop, AppToStop) ->
  717. false;
  718. is_needed_by(AppToStop, RunningApp) ->
  719. case application:get_key(RunningApp, applications) of
  720. {ok, Deps} -> lists:member(AppToStop, Deps);
  721. undefined -> false
  722. end.
  723. put_config(Key, Value) when is_atom(Key) ->
  724. put_config([Key], Value);
  725. put_config(Path, Values) when is_list(Path) ->
  726. Opts = #{rawconf_with_defaults => true, override_to => cluster},
  727. %% Already in cluster_rpc, don't use emqx_conf:update, dead calls
  728. case emqx:update_config([?CONF_ROOT | Path], bin_key(Values), Opts) of
  729. {ok, _} -> ok;
  730. Error -> Error
  731. end.
  732. bin_key(Map) when is_map(Map) ->
  733. maps:fold(fun(K, V, Acc) -> Acc#{bin(K) => V} end, #{}, Map);
  734. bin_key(List = [#{} | _]) ->
  735. lists:map(fun(M) -> bin_key(M) end, List);
  736. bin_key(Term) ->
  737. Term.
  738. get_config(Key, Default) when is_atom(Key) ->
  739. get_config([Key], Default);
  740. get_config(Path, Default) ->
  741. emqx_conf:get([?CONF_ROOT | Path], Default).
  742. install_dir() -> get_config(install_dir, "").
  743. put_configured(Configured) ->
  744. ok = put_config(states, bin_key(Configured)).
  745. configured() ->
  746. get_config(states, []).
  747. for_plugins(ActionFun) ->
  748. case lists:flatmap(fun(I) -> for_plugin(I, ActionFun) end, configured()) of
  749. [] -> ok;
  750. Errors -> erlang:error(#{function => ActionFun, errors => Errors})
  751. end.
  752. for_plugin(#{name_vsn := NameVsn, enable := true}, Fun) ->
  753. case Fun(NameVsn) of
  754. ok -> [];
  755. {error, Reason} -> [{NameVsn, Reason}]
  756. end;
  757. for_plugin(#{name_vsn := NameVsn, enable := false}, _Fun) ->
  758. ?SLOG(debug, #{
  759. msg => "plugin_disabled",
  760. name_vsn => NameVsn
  761. }),
  762. [].
  763. parse_name_vsn(NameVsn) when is_binary(NameVsn) ->
  764. parse_name_vsn(binary_to_list(NameVsn));
  765. parse_name_vsn(NameVsn) when is_list(NameVsn) ->
  766. case lists:splitwith(fun(X) -> X =/= $- end, NameVsn) of
  767. {AppName, [$- | Vsn]} -> {ok, list_to_atom(AppName), Vsn};
  768. _ -> {error, "bad_name_vsn"}
  769. end.
  770. pkg_file(NameVsn) ->
  771. filename:join([install_dir(), bin([NameVsn, ".tar.gz"])]).
  772. dir(NameVsn) ->
  773. filename:join([install_dir(), NameVsn]).
  774. info_file(NameVsn) ->
  775. filename:join([dir(NameVsn), "release.json"]).
  776. readme_file(NameVsn) ->
  777. filename:join([dir(NameVsn), "README.md"]).
  778. running_apps() ->
  779. lists:map(
  780. fun({N, _, V}) ->
  781. {N, V}
  782. end,
  783. application:which_applications(infinity)
  784. ).