emqx_plugins.erl 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2017-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_plugins).
  17. -include_lib("emqx/include/logger.hrl").
  18. -include_lib("emqx/include/logger.hrl").
  19. -include("emqx_plugins.hrl").
  20. -ifdef(TEST).
  21. -include_lib("eunit/include/eunit.hrl").
  22. -endif.
  23. -export([
  24. ensure_installed/1,
  25. ensure_uninstalled/1,
  26. ensure_enabled/1,
  27. ensure_enabled/2,
  28. ensure_enabled/3,
  29. ensure_disabled/1,
  30. purge/1,
  31. delete_package/1
  32. ]).
  33. -export([
  34. ensure_started/0,
  35. ensure_started/1,
  36. ensure_stopped/0,
  37. ensure_stopped/1,
  38. restart/1,
  39. list/0,
  40. describe/1,
  41. parse_name_vsn/1
  42. ]).
  43. -export([
  44. get_config/2,
  45. put_config/2,
  46. get_tar/1
  47. ]).
  48. %% `emqx_config_handler' API
  49. -export([
  50. post_config_update/5
  51. ]).
  52. %% internal
  53. -export([do_ensure_started/1]).
  54. -export([
  55. install_dir/0
  56. ]).
  57. -ifdef(TEST).
  58. -compile(export_all).
  59. -compile(nowarn_export_all).
  60. -endif.
  61. %% "my_plugin-0.1.0"
  62. -type name_vsn() :: binary() | string().
  63. %% the parse result of the JSON info file
  64. -type plugin() :: map().
  65. -type position() :: no_move | front | rear | {before, name_vsn()} | {behind, name_vsn()}.
  66. %%--------------------------------------------------------------------
  67. %% APIs
  68. %%--------------------------------------------------------------------
  69. %% @doc Describe a plugin.
  70. -spec describe(name_vsn()) -> {ok, plugin()} | {error, any()}.
  71. describe(NameVsn) -> read_plugin(NameVsn, #{fill_readme => true}).
  72. %% @doc Install a .tar.gz package placed in install_dir.
  73. -spec ensure_installed(name_vsn()) -> ok | {error, map()}.
  74. ensure_installed(NameVsn) ->
  75. case read_plugin(NameVsn, #{}) of
  76. {ok, _} ->
  77. ok;
  78. {error, _} ->
  79. ok = purge(NameVsn),
  80. do_ensure_installed(NameVsn)
  81. end.
  82. do_ensure_installed(NameVsn) ->
  83. TarGz = pkg_file(NameVsn),
  84. case erl_tar:extract(TarGz, [compressed, memory]) of
  85. {ok, TarContent} ->
  86. ok = write_tar_file_content(install_dir(), TarContent),
  87. case read_plugin(NameVsn, #{}) of
  88. {ok, _} ->
  89. ok;
  90. {error, Reason} ->
  91. ?SLOG(warning, Reason#{msg => "failed_to_read_after_install"}),
  92. ok = delete_tar_file_content(install_dir(), TarContent),
  93. {error, Reason}
  94. end;
  95. {error, {_, enoent}} ->
  96. {error, #{
  97. reason => "failed_to_extract_plugin_package",
  98. path => TarGz,
  99. return => not_found
  100. }};
  101. {error, Reason} ->
  102. {error, #{
  103. reason => "bad_plugin_package",
  104. path => TarGz,
  105. return => Reason
  106. }}
  107. end.
  108. -spec get_tar(name_vsn()) -> {ok, binary()} | {error, any}.
  109. get_tar(NameVsn) ->
  110. TarGz = pkg_file(NameVsn),
  111. case file:read_file(TarGz) of
  112. {ok, Content} ->
  113. {ok, Content};
  114. {error, _} ->
  115. case maybe_create_tar(NameVsn, TarGz, install_dir()) of
  116. ok ->
  117. file:read_file(TarGz);
  118. Err ->
  119. Err
  120. end
  121. end.
  122. maybe_create_tar(NameVsn, TarGzName, InstallDir) when is_binary(InstallDir) ->
  123. maybe_create_tar(NameVsn, TarGzName, binary_to_list(InstallDir));
  124. maybe_create_tar(NameVsn, TarGzName, InstallDir) ->
  125. case filelib:wildcard(filename:join(dir(NameVsn), "**")) of
  126. [_ | _] = PluginFiles ->
  127. InstallDir1 = string:trim(InstallDir, trailing, "/") ++ "/",
  128. PluginFiles1 = [{string:prefix(F, InstallDir1), F} || F <- PluginFiles],
  129. erl_tar:create(TarGzName, PluginFiles1, [compressed]);
  130. _ ->
  131. {error, plugin_not_found}
  132. end.
  133. write_tar_file_content(BaseDir, TarContent) ->
  134. lists:foreach(
  135. fun({Name, Bin}) ->
  136. Filename = filename:join(BaseDir, Name),
  137. ok = filelib:ensure_dir(Filename),
  138. ok = file:write_file(Filename, Bin)
  139. end,
  140. TarContent
  141. ).
  142. delete_tar_file_content(BaseDir, TarContent) ->
  143. lists:foreach(
  144. fun({Name, _}) ->
  145. Filename = filename:join(BaseDir, Name),
  146. case filelib:is_file(Filename) of
  147. true ->
  148. TopDirOrFile = top_dir(BaseDir, Filename),
  149. ok = file:del_dir_r(TopDirOrFile);
  150. false ->
  151. %% probably already deleted
  152. ok
  153. end
  154. end,
  155. TarContent
  156. ).
  157. top_dir(BaseDir0, DirOrFile) ->
  158. BaseDir = normalize_dir(BaseDir0),
  159. case filename:dirname(DirOrFile) of
  160. RockBottom when RockBottom =:= "/" orelse RockBottom =:= "." ->
  161. throw({out_of_bounds, DirOrFile});
  162. BaseDir ->
  163. DirOrFile;
  164. Parent ->
  165. top_dir(BaseDir, Parent)
  166. end.
  167. normalize_dir(Dir) ->
  168. %% Get rid of possible trailing slash
  169. filename:join([Dir, ""]).
  170. -ifdef(TEST).
  171. normalize_dir_test_() ->
  172. [
  173. ?_assertEqual("foo", normalize_dir("foo")),
  174. ?_assertEqual("foo", normalize_dir("foo/")),
  175. ?_assertEqual("/foo", normalize_dir("/foo")),
  176. ?_assertEqual("/foo", normalize_dir("/foo/"))
  177. ].
  178. top_dir_test_() ->
  179. [
  180. ?_assertEqual("base/foo", top_dir("base", filename:join(["base", "foo", "bar"]))),
  181. ?_assertEqual("/base/foo", top_dir("/base", filename:join(["/", "base", "foo", "bar"]))),
  182. ?_assertEqual("/base/foo", top_dir("/base/", filename:join(["/", "base", "foo", "bar"]))),
  183. ?_assertThrow({out_of_bounds, _}, top_dir("/base", filename:join(["/", "base"]))),
  184. ?_assertThrow({out_of_bounds, _}, top_dir("/base", filename:join(["/", "foo", "bar"])))
  185. ].
  186. -endif.
  187. %% @doc Ensure files and directories for the given plugin are being deleted.
  188. %% If a plugin is running, or enabled, an error is returned.
  189. -spec ensure_uninstalled(name_vsn()) -> ok | {error, any()}.
  190. ensure_uninstalled(NameVsn) ->
  191. case read_plugin(NameVsn, #{}) of
  192. {ok, #{running_status := RunningSt}} when RunningSt =/= stopped ->
  193. {error, #{
  194. reason => "bad_plugin_running_status",
  195. hint => "stop_the_plugin_first"
  196. }};
  197. {ok, #{config_status := enabled}} ->
  198. {error, #{
  199. reason => "bad_plugin_config_status",
  200. hint => "disable_the_plugin_first"
  201. }};
  202. _ ->
  203. purge(NameVsn),
  204. ensure_delete(NameVsn)
  205. end.
  206. ensure_delete(NameVsn0) ->
  207. NameVsn = bin(NameVsn0),
  208. List = configured(),
  209. put_configured(lists:filter(fun(#{name_vsn := N1}) -> bin(N1) =/= NameVsn end, List)),
  210. ok.
  211. %% @doc Ensure a plugin is enabled to the end of the plugins list.
  212. -spec ensure_enabled(name_vsn()) -> ok | {error, any()}.
  213. ensure_enabled(NameVsn) ->
  214. ensure_enabled(NameVsn, no_move).
  215. %% @doc Ensure a plugin is enabled at the given position of the plugin list.
  216. -spec ensure_enabled(name_vsn(), position()) -> ok | {error, any()}.
  217. ensure_enabled(NameVsn, Position) ->
  218. ensure_state(NameVsn, Position, _Enabled = true, _ConfLocation = local).
  219. -spec ensure_enabled(name_vsn(), position(), local | global) -> ok | {error, any()}.
  220. ensure_enabled(NameVsn, Position, ConfLocation) when
  221. ConfLocation =:= local; ConfLocation =:= global
  222. ->
  223. ensure_state(NameVsn, Position, _Enabled = true, ConfLocation).
  224. %% @doc Ensure a plugin is disabled.
  225. -spec ensure_disabled(name_vsn()) -> ok | {error, any()}.
  226. ensure_disabled(NameVsn) ->
  227. ensure_state(NameVsn, no_move, false, _ConfLocation = local).
  228. ensure_state(NameVsn, Position, State, ConfLocation) when is_binary(NameVsn) ->
  229. ensure_state(binary_to_list(NameVsn), Position, State, ConfLocation);
  230. ensure_state(NameVsn, Position, State, ConfLocation) ->
  231. case read_plugin(NameVsn, #{}) of
  232. {ok, _} ->
  233. Item = #{
  234. name_vsn => NameVsn,
  235. enable => State
  236. },
  237. tryit("ensure_state", fun() -> ensure_configured(Item, Position, ConfLocation) end);
  238. {error, Reason} ->
  239. {error, Reason}
  240. end.
  241. ensure_configured(#{name_vsn := NameVsn} = Item, Position, ConfLocation) ->
  242. Configured = configured(),
  243. SplitFun = fun(#{name_vsn := Nv}) -> bin(Nv) =/= bin(NameVsn) end,
  244. {Front, Rear} = lists:splitwith(SplitFun, Configured),
  245. NewConfigured =
  246. case Rear of
  247. [_ | More] when Position =:= no_move ->
  248. Front ++ [Item | More];
  249. [_ | More] ->
  250. add_new_configured(Front ++ More, Position, Item);
  251. [] ->
  252. add_new_configured(Configured, Position, Item)
  253. end,
  254. ok = put_configured(NewConfigured, ConfLocation).
  255. add_new_configured(Configured, no_move, Item) ->
  256. %% default to rear
  257. add_new_configured(Configured, rear, Item);
  258. add_new_configured(Configured, front, Item) ->
  259. [Item | Configured];
  260. add_new_configured(Configured, rear, Item) ->
  261. Configured ++ [Item];
  262. add_new_configured(Configured, {Action, NameVsn}, Item) ->
  263. SplitFun = fun(#{name_vsn := Nv}) -> bin(Nv) =/= bin(NameVsn) end,
  264. {Front, Rear} = lists:splitwith(SplitFun, Configured),
  265. Rear =:= [] andalso
  266. throw(#{
  267. error => "position_anchor_plugin_not_configured",
  268. hint => "maybe_install_and_configure",
  269. name_vsn => NameVsn
  270. }),
  271. case Action of
  272. before ->
  273. Front ++ [Item | Rear];
  274. behind ->
  275. [Anchor | Rear0] = Rear,
  276. Front ++ [Anchor, Item | Rear0]
  277. end.
  278. %% @doc Delete the package file.
  279. -spec delete_package(name_vsn()) -> ok.
  280. delete_package(NameVsn) ->
  281. File = pkg_file(NameVsn),
  282. case file:delete(File) of
  283. ok ->
  284. ?SLOG(info, #{msg => "purged_plugin_dir", path => File}),
  285. ok;
  286. {error, enoent} ->
  287. ok;
  288. {error, Reason} ->
  289. ?SLOG(error, #{
  290. msg => "failed_to_delete_package_file",
  291. path => File,
  292. reason => Reason
  293. }),
  294. {error, Reason}
  295. end.
  296. %% @doc Delete extracted dir
  297. %% In case one lib is shared by multiple plugins.
  298. %% it might be the case that purging one plugin's install dir
  299. %% will cause deletion of loaded beams.
  300. %% It should not be a problem, because shared lib should
  301. %% reside in all the plugin install dirs.
  302. -spec purge(name_vsn()) -> ok.
  303. purge(NameVsn) ->
  304. Dir = dir(NameVsn),
  305. case file:del_dir_r(Dir) of
  306. ok ->
  307. ?SLOG(info, #{msg => "purged_plugin_dir", dir => Dir});
  308. {error, enoent} ->
  309. ok;
  310. {error, Reason} ->
  311. ?SLOG(error, #{
  312. msg => "failed_to_purge_plugin_dir",
  313. dir => Dir,
  314. reason => Reason
  315. }),
  316. {error, Reason}
  317. end.
  318. %% @doc Start all configured plugins are started.
  319. -spec ensure_started() -> ok.
  320. ensure_started() ->
  321. ok = for_plugins(fun ?MODULE:do_ensure_started/1).
  322. %% @doc Start a plugin from Management API or CLI.
  323. %% the input is a <name>-<vsn> string.
  324. -spec ensure_started(name_vsn()) -> ok | {error, term()}.
  325. ensure_started(NameVsn) ->
  326. case do_ensure_started(NameVsn) of
  327. ok ->
  328. ok;
  329. {error, Reason} ->
  330. ?SLOG(alert, #{
  331. msg => "failed_to_start_plugin",
  332. reason => Reason
  333. }),
  334. {error, Reason}
  335. end.
  336. %% @doc Stop all plugins before broker stops.
  337. -spec ensure_stopped() -> ok.
  338. ensure_stopped() ->
  339. for_plugins(fun ?MODULE:ensure_stopped/1).
  340. %% @doc Stop a plugin from Management API or CLI.
  341. -spec ensure_stopped(name_vsn()) -> ok | {error, term()}.
  342. ensure_stopped(NameVsn) ->
  343. tryit(
  344. "stop_plugin",
  345. fun() ->
  346. Plugin = do_read_plugin(NameVsn),
  347. ensure_apps_stopped(Plugin)
  348. end
  349. ).
  350. %% @doc Stop and then start the plugin.
  351. restart(NameVsn) ->
  352. case ensure_stopped(NameVsn) of
  353. ok -> ensure_started(NameVsn);
  354. {error, Reason} -> {error, Reason}
  355. end.
  356. %% @doc List all installed plugins.
  357. %% Including the ones that are installed, but not enabled in config.
  358. -spec list() -> [plugin()].
  359. list() ->
  360. Pattern = filename:join([install_dir(), "*", "release.json"]),
  361. All = lists:filtermap(
  362. fun(JsonFile) ->
  363. case read_plugin({file, JsonFile}, #{}) of
  364. {ok, Info} ->
  365. {true, Info};
  366. {error, Reason} ->
  367. ?SLOG(warning, Reason),
  368. false
  369. end
  370. end,
  371. filelib:wildcard(Pattern)
  372. ),
  373. list(configured(), All).
  374. %% Make sure configured ones are ordered in front.
  375. list([], All) ->
  376. All;
  377. list([#{name_vsn := NameVsn} | Rest], All) ->
  378. SplitF = fun(#{<<"name">> := Name, <<"rel_vsn">> := Vsn}) ->
  379. bin([Name, "-", Vsn]) =/= bin(NameVsn)
  380. end,
  381. case lists:splitwith(SplitF, All) of
  382. {_, []} ->
  383. ?SLOG(warning, #{
  384. msg => "configured_plugin_not_installed",
  385. name_vsn => NameVsn
  386. }),
  387. list(Rest, All);
  388. {Front, [I | Rear]} ->
  389. [I | list(Rest, Front ++ Rear)]
  390. end.
  391. do_ensure_started(NameVsn) ->
  392. tryit(
  393. "start_plugins",
  394. fun() ->
  395. case ensure_exists_and_installed(NameVsn) of
  396. ok ->
  397. Plugin = do_read_plugin(NameVsn),
  398. ok = load_code_start_apps(NameVsn, Plugin);
  399. {error, plugin_not_found} ->
  400. ?SLOG(error, #{
  401. msg => "plugin_not_found",
  402. name_vsn => NameVsn
  403. })
  404. end
  405. end
  406. ).
  407. %% try the function, catch 'throw' exceptions as normal 'error' return
  408. %% other exceptions with stacktrace logged.
  409. tryit(WhichOp, F) ->
  410. try
  411. F()
  412. catch
  413. throw:Reason ->
  414. %% thrown exceptions are known errors
  415. %% translate to a return value without stacktrace
  416. {error, Reason};
  417. error:Reason:Stacktrace ->
  418. %% unexpected errors, log stacktrace
  419. ?SLOG(warning, #{
  420. msg => "plugin_op_failed",
  421. which_op => WhichOp,
  422. exception => Reason,
  423. stacktrace => Stacktrace
  424. }),
  425. {error, {failed, WhichOp}}
  426. end.
  427. %% read plugin info from the JSON file
  428. %% returns {ok, Info} or {error, Reason}
  429. read_plugin(NameVsn, Options) ->
  430. tryit(
  431. "read_plugin_info",
  432. fun() -> {ok, do_read_plugin(NameVsn, Options)} end
  433. ).
  434. do_read_plugin(Plugin) -> do_read_plugin(Plugin, #{}).
  435. do_read_plugin({file, InfoFile}, Options) ->
  436. [_, NameVsn | _] = lists:reverse(filename:split(InfoFile)),
  437. case hocon:load(InfoFile, #{format => richmap}) of
  438. {ok, RichMap} ->
  439. Info0 = check_plugin(hocon_maps:ensure_plain(RichMap), NameVsn, InfoFile),
  440. Info1 = plugins_readme(NameVsn, Options, Info0),
  441. plugin_status(NameVsn, Info1);
  442. {error, Reason} ->
  443. throw(#{
  444. error => "bad_info_file",
  445. path => InfoFile,
  446. return => Reason
  447. })
  448. end;
  449. do_read_plugin(NameVsn, Options) ->
  450. do_read_plugin({file, info_file(NameVsn)}, Options).
  451. ensure_exists_and_installed(NameVsn) ->
  452. case filelib:is_dir(dir(NameVsn)) of
  453. true ->
  454. ok;
  455. false ->
  456. %% Do we have the package, but it's not extracted yet?
  457. case get_tar(NameVsn) of
  458. {ok, TarContent} ->
  459. ok = file:write_file(pkg_file(NameVsn), TarContent),
  460. ok = do_ensure_installed(NameVsn);
  461. _ ->
  462. %% If not, try to get it from the cluster.
  463. do_get_from_cluster(NameVsn)
  464. end
  465. end.
  466. do_get_from_cluster(NameVsn) ->
  467. Nodes = [N || N <- mria:running_nodes(), N /= node()],
  468. case get_from_any_node(Nodes, NameVsn, []) of
  469. {ok, TarContent} ->
  470. ok = file:write_file(pkg_file(NameVsn), TarContent),
  471. ok = do_ensure_installed(NameVsn);
  472. {error, NodeErrors} when Nodes =/= [] ->
  473. ?SLOG(error, #{
  474. msg => "failed_to_copy_plugin_from_other_nodes",
  475. name_vsn => NameVsn,
  476. node_errors => NodeErrors
  477. }),
  478. {error, plugin_not_found};
  479. {error, _} ->
  480. ?SLOG(error, #{
  481. msg => "no_nodes_to_copy_plugin_from",
  482. name_vsn => NameVsn
  483. }),
  484. {error, plugin_not_found}
  485. end.
  486. get_from_any_node([], _NameVsn, Errors) ->
  487. {error, Errors};
  488. get_from_any_node([Node | T], NameVsn, Errors) ->
  489. case emqx_plugins_proto_v1:get_tar(Node, NameVsn, infinity) of
  490. {ok, _} = Res ->
  491. Res;
  492. Err ->
  493. get_from_any_node(T, NameVsn, [{Node, Err} | Errors])
  494. end.
  495. plugins_readme(NameVsn, #{fill_readme := true}, Info) ->
  496. case file:read_file(readme_file(NameVsn)) of
  497. {ok, Bin} -> Info#{readme => Bin};
  498. _ -> Info#{readme => <<>>}
  499. end;
  500. plugins_readme(_NameVsn, _Options, Info) ->
  501. Info.
  502. plugin_status(NameVsn, Info) ->
  503. {ok, AppName, _AppVsn} = parse_name_vsn(NameVsn),
  504. RunningSt =
  505. case application:get_key(AppName, vsn) of
  506. {ok, _} ->
  507. case lists:keyfind(AppName, 1, running_apps()) of
  508. {AppName, _} -> running;
  509. _ -> loaded
  510. end;
  511. undefined ->
  512. stopped
  513. end,
  514. Configured = lists:filtermap(
  515. fun(#{name_vsn := Nv, enable := St}) ->
  516. case bin(Nv) =:= bin(NameVsn) of
  517. true -> {true, St};
  518. false -> false
  519. end
  520. end,
  521. configured()
  522. ),
  523. ConfSt =
  524. case Configured of
  525. [] -> not_configured;
  526. [true] -> enabled;
  527. [false] -> disabled
  528. end,
  529. Info#{
  530. running_status => RunningSt,
  531. config_status => ConfSt
  532. }.
  533. bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
  534. bin(L) when is_list(L) -> unicode:characters_to_binary(L, utf8);
  535. bin(B) when is_binary(B) -> B.
  536. check_plugin(
  537. #{
  538. <<"name">> := Name,
  539. <<"rel_vsn">> := Vsn,
  540. <<"rel_apps">> := Apps,
  541. <<"description">> := _
  542. } = Info,
  543. NameVsn,
  544. File
  545. ) ->
  546. case bin(NameVsn) =:= bin([Name, "-", Vsn]) of
  547. true ->
  548. try
  549. %% assert
  550. [_ | _] = Apps,
  551. %% validate if the list is all <app>-<vsn> strings
  552. lists:foreach(fun(App) -> {ok, _, _} = parse_name_vsn(App) end, Apps)
  553. catch
  554. _:_ ->
  555. throw(#{
  556. error => "bad_rel_apps",
  557. rel_apps => Apps,
  558. hint => "A non-empty string list of app_name-app_vsn format"
  559. })
  560. end,
  561. Info;
  562. false ->
  563. throw(#{
  564. error => "name_vsn_mismatch",
  565. name_vsn => NameVsn,
  566. path => File,
  567. name => Name,
  568. rel_vsn => Vsn
  569. })
  570. end;
  571. check_plugin(_What, NameVsn, File) ->
  572. throw(#{
  573. error => "bad_info_file_content",
  574. mandatory_fields => [rel_vsn, name, rel_apps, description],
  575. name_vsn => NameVsn,
  576. path => File
  577. }).
  578. load_code_start_apps(RelNameVsn, #{<<"rel_apps">> := Apps}) ->
  579. LibDir = filename:join([install_dir(), RelNameVsn]),
  580. RunningApps = running_apps(),
  581. %% load plugin apps and beam code
  582. AppNames =
  583. lists:map(
  584. fun(AppNameVsn) ->
  585. {ok, AppName, AppVsn} = parse_name_vsn(AppNameVsn),
  586. EbinDir = filename:join([LibDir, AppNameVsn, "ebin"]),
  587. ok = load_plugin_app(AppName, AppVsn, EbinDir, RunningApps),
  588. AppName
  589. end,
  590. Apps
  591. ),
  592. lists:foreach(fun start_app/1, AppNames).
  593. load_plugin_app(AppName, AppVsn, Ebin, RunningApps) ->
  594. case lists:keyfind(AppName, 1, RunningApps) of
  595. false ->
  596. do_load_plugin_app(AppName, Ebin);
  597. {_, Vsn} ->
  598. case bin(Vsn) =:= bin(AppVsn) of
  599. true ->
  600. %% already started on the exact version
  601. ok;
  602. false ->
  603. %% running but a different version
  604. ?SLOG(warning, #{
  605. msg => "plugin_app_already_running",
  606. name => AppName,
  607. running_vsn => Vsn,
  608. loading_vsn => AppVsn
  609. })
  610. end
  611. end.
  612. do_load_plugin_app(AppName, Ebin) when is_binary(Ebin) ->
  613. do_load_plugin_app(AppName, binary_to_list(Ebin));
  614. do_load_plugin_app(AppName, Ebin) ->
  615. _ = code:add_patha(Ebin),
  616. Modules = filelib:wildcard(filename:join([Ebin, "*.beam"])),
  617. lists:foreach(
  618. fun(BeamFile) ->
  619. Module = list_to_atom(filename:basename(BeamFile, ".beam")),
  620. _ = code:purge(Module),
  621. case code:load_file(Module) of
  622. {module, _} ->
  623. ok;
  624. {error, Reason} ->
  625. throw(#{
  626. error => "failed_to_load_plugin_beam",
  627. path => BeamFile,
  628. reason => Reason
  629. })
  630. end
  631. end,
  632. Modules
  633. ),
  634. case application:load(AppName) of
  635. ok ->
  636. ok;
  637. {error, {already_loaded, _}} ->
  638. ok;
  639. {error, Reason} ->
  640. throw(#{
  641. error => "failed_to_load_plugin_app",
  642. name => AppName,
  643. reason => Reason
  644. })
  645. end.
  646. start_app(App) ->
  647. case application:ensure_all_started(App) of
  648. {ok, Started} ->
  649. case Started =/= [] of
  650. true -> ?SLOG(debug, #{msg => "started_plugin_apps", apps => Started});
  651. false -> ok
  652. end,
  653. ?SLOG(debug, #{msg => "started_plugin_app", app => App}),
  654. ok;
  655. {error, {ErrApp, Reason}} ->
  656. throw(#{
  657. error => "failed_to_start_plugin_app",
  658. app => App,
  659. err_app => ErrApp,
  660. reason => Reason
  661. })
  662. end.
  663. %% Stop all apps installed by the plugin package,
  664. %% but not the ones shared with others.
  665. ensure_apps_stopped(#{<<"rel_apps">> := Apps}) ->
  666. %% load plugin apps and beam code
  667. AppsToStop =
  668. lists:map(
  669. fun(NameVsn) ->
  670. {ok, AppName, _AppVsn} = parse_name_vsn(NameVsn),
  671. AppName
  672. end,
  673. Apps
  674. ),
  675. case tryit("stop_apps", fun() -> stop_apps(AppsToStop) end) of
  676. {ok, []} ->
  677. %% all apps stopped
  678. ok;
  679. {ok, Left} ->
  680. ?SLOG(warning, #{
  681. msg => "unabled_to_stop_plugin_apps",
  682. apps => Left,
  683. reason => "running_apps_still_depends_on_this_apps"
  684. }),
  685. ok;
  686. {error, Reason} ->
  687. {error, Reason}
  688. end.
  689. stop_apps(Apps) ->
  690. RunningApps = running_apps(),
  691. case do_stop_apps(Apps, [], RunningApps) of
  692. %% all stopped
  693. {ok, []} -> {ok, []};
  694. %% no progress
  695. {ok, Remain} when Remain =:= Apps -> {ok, Apps};
  696. %% try again
  697. {ok, Remain} -> stop_apps(Remain)
  698. end.
  699. do_stop_apps([], Remain, _AllApps) ->
  700. {ok, lists:reverse(Remain)};
  701. do_stop_apps([App | Apps], Remain, RunningApps) ->
  702. case is_needed_by_any(App, RunningApps) of
  703. true ->
  704. do_stop_apps(Apps, [App | Remain], RunningApps);
  705. false ->
  706. ok = stop_app(App),
  707. do_stop_apps(Apps, Remain, RunningApps)
  708. end.
  709. stop_app(App) ->
  710. case application:stop(App) of
  711. ok ->
  712. ?SLOG(debug, #{msg => "stop_plugin_successfully", app => App}),
  713. ok = unload_moudle_and_app(App);
  714. {error, {not_started, App}} ->
  715. ?SLOG(debug, #{msg => "plugin_not_started", app => App}),
  716. ok = unload_moudle_and_app(App);
  717. {error, Reason} ->
  718. throw(#{error => "failed_to_stop_app", app => App, reason => Reason})
  719. end.
  720. unload_moudle_and_app(App) ->
  721. case application:get_key(App, modules) of
  722. {ok, Modules} -> lists:foreach(fun code:soft_purge/1, Modules);
  723. _ -> ok
  724. end,
  725. _ = application:unload(App),
  726. ok.
  727. is_needed_by_any(AppToStop, RunningApps) ->
  728. lists:any(
  729. fun({RunningApp, _RunningAppVsn}) ->
  730. is_needed_by(AppToStop, RunningApp)
  731. end,
  732. RunningApps
  733. ).
  734. is_needed_by(AppToStop, AppToStop) ->
  735. false;
  736. is_needed_by(AppToStop, RunningApp) ->
  737. case application:get_key(RunningApp, applications) of
  738. {ok, Deps} -> lists:member(AppToStop, Deps);
  739. undefined -> false
  740. end.
  741. put_config(Key, Value) ->
  742. put_config(Key, Value, _ConfLocation = local).
  743. put_config(Key, Value, ConfLocation) when is_atom(Key) ->
  744. put_config([Key], Value, ConfLocation);
  745. put_config(Path, Values, _ConfLocation = local) when is_list(Path) ->
  746. Opts = #{rawconf_with_defaults => true, override_to => cluster},
  747. %% Already in cluster_rpc, don't use emqx_conf:update, dead calls
  748. case emqx:update_config([?CONF_ROOT | Path], bin_key(Values), Opts) of
  749. {ok, _} -> ok;
  750. Error -> Error
  751. end;
  752. put_config(Path, Values, _ConfLocation = global) when is_list(Path) ->
  753. Opts = #{rawconf_with_defaults => true, override_to => cluster},
  754. case emqx_conf:update([?CONF_ROOT | Path], bin_key(Values), Opts) of
  755. {ok, _} -> ok;
  756. Error -> Error
  757. end.
  758. bin_key(Map) when is_map(Map) ->
  759. maps:fold(fun(K, V, Acc) -> Acc#{bin(K) => V} end, #{}, Map);
  760. bin_key(List = [#{} | _]) ->
  761. lists:map(fun(M) -> bin_key(M) end, List);
  762. bin_key(Term) ->
  763. Term.
  764. get_config(Key, Default) when is_atom(Key) ->
  765. get_config([Key], Default);
  766. get_config(Path, Default) ->
  767. emqx_conf:get([?CONF_ROOT | Path], Default).
  768. install_dir() -> get_config(install_dir, "").
  769. put_configured(Configured) ->
  770. put_configured(Configured, _ConfLocation = local).
  771. put_configured(Configured, ConfLocation) ->
  772. ok = put_config(states, bin_key(Configured), ConfLocation).
  773. configured() ->
  774. get_config(states, []).
  775. for_plugins(ActionFun) ->
  776. case lists:flatmap(fun(I) -> for_plugin(I, ActionFun) end, configured()) of
  777. [] -> ok;
  778. Errors -> erlang:error(#{function => ActionFun, errors => Errors})
  779. end.
  780. for_plugin(#{name_vsn := NameVsn, enable := true}, Fun) ->
  781. case Fun(NameVsn) of
  782. ok -> [];
  783. {error, Reason} -> [{NameVsn, Reason}]
  784. end;
  785. for_plugin(#{name_vsn := NameVsn, enable := false}, _Fun) ->
  786. ?SLOG(debug, #{
  787. msg => "plugin_disabled",
  788. name_vsn => NameVsn
  789. }),
  790. [].
  791. parse_name_vsn(NameVsn) when is_binary(NameVsn) ->
  792. parse_name_vsn(binary_to_list(NameVsn));
  793. parse_name_vsn(NameVsn) when is_list(NameVsn) ->
  794. case lists:splitwith(fun(X) -> X =/= $- end, NameVsn) of
  795. {AppName, [$- | Vsn]} -> {ok, list_to_atom(AppName), Vsn};
  796. _ -> {error, "bad_name_vsn"}
  797. end.
  798. pkg_file(NameVsn) ->
  799. filename:join([install_dir(), bin([NameVsn, ".tar.gz"])]).
  800. dir(NameVsn) ->
  801. filename:join([install_dir(), NameVsn]).
  802. info_file(NameVsn) ->
  803. filename:join([dir(NameVsn), "release.json"]).
  804. readme_file(NameVsn) ->
  805. filename:join([dir(NameVsn), "README.md"]).
  806. running_apps() ->
  807. lists:map(
  808. fun({N, _, V}) ->
  809. {N, V}
  810. end,
  811. application:which_applications(infinity)
  812. ).
  813. %%--------------------------------------------------------------------
  814. %% `emqx_config_handler' API
  815. %%--------------------------------------------------------------------
  816. post_config_update([?CONF_ROOT], _Req, #{states := NewStates}, #{states := OldStates}, _Envs) ->
  817. NewStatesIndex = maps:from_list([{NV, S} || S = #{name_vsn := NV} <- NewStates]),
  818. OldStatesIndex = maps:from_list([{NV, S} || S = #{name_vsn := NV} <- OldStates]),
  819. #{changed := Changed} = emqx_utils_maps:diff_maps(NewStatesIndex, OldStatesIndex),
  820. maps:foreach(fun enable_disable_plugin/2, Changed),
  821. ok;
  822. post_config_update(_Path, _Req, _NewConf, _OldConf, _Envs) ->
  823. ok.
  824. enable_disable_plugin(NameVsn, {#{enable := true}, #{enable := false}}) ->
  825. %% errors are already logged in this fn
  826. _ = ensure_stopped(NameVsn),
  827. ok;
  828. enable_disable_plugin(NameVsn, {#{enable := false}, #{enable := true}}) ->
  829. %% errors are already logged in this fn
  830. _ = ensure_started(NameVsn),
  831. ok;
  832. enable_disable_plugin(_NameVsn, _Diff) ->
  833. ok.