emqx_plugins.erl 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2017-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_plugins).
  17. -include_lib("emqx/include/logger.hrl").
  18. -include("emqx_plugins.hrl").
  19. -ifdef(TEST).
  20. -include_lib("eunit/include/eunit.hrl").
  21. -endif.
  22. -export([
  23. describe/1,
  24. plugin_avsc/1,
  25. plugin_i18n/1,
  26. plugin_avro/1,
  27. parse_name_vsn/1,
  28. make_name_vsn_string/2
  29. ]).
  30. %% Package operations
  31. -export([
  32. ensure_installed/1,
  33. ensure_uninstalled/1,
  34. ensure_enabled/1,
  35. ensure_enabled/2,
  36. ensure_enabled/3,
  37. ensure_disabled/1,
  38. purge/1,
  39. delete_package/1
  40. ]).
  41. %% Plugin runtime management
  42. -export([
  43. ensure_started/0,
  44. ensure_started/1,
  45. ensure_stopped/0,
  46. ensure_stopped/1,
  47. restart/1,
  48. list/0
  49. ]).
  50. %% Plugin config APIs
  51. -export([
  52. get_plugin_config/1,
  53. get_plugin_config/2,
  54. get_plugin_config/3,
  55. get_plugin_config/4,
  56. put_plugin_config/3
  57. ]).
  58. %% Package utils
  59. -export([
  60. decode_plugin_avro_config/2,
  61. install_dir/0,
  62. avsc_file_path/1
  63. ]).
  64. %% `emqx_config_handler' API
  65. -export([
  66. post_config_update/5
  67. ]).
  68. %% Internal export
  69. -export([do_ensure_started/1]).
  70. %% for test cases
  71. -export([put_config/2]).
  72. -ifdef(TEST).
  73. -compile(export_all).
  74. -compile(nowarn_export_all).
  75. -endif.
  76. %% Defines
  77. -define(PLUGIN_PERSIS_CONFIG_KEY(NameVsn), {?MODULE, NameVsn}).
  78. -define(RAW_BIN, binary).
  79. -define(JSON_MAP, json_map).
  80. %% "my_plugin-0.1.0"
  81. -type name_vsn() :: binary() | string().
  82. %% the parse result of the JSON info file
  83. -type plugin() :: map().
  84. -type schema_json() :: map().
  85. -type i18n_json() :: map().
  86. -type avro_binary() :: binary().
  87. -type plugin_config() :: map().
  88. -type position() :: no_move | front | rear | {before, name_vsn()} | {behind, name_vsn()}.
  89. %%--------------------------------------------------------------------
  90. %% APIs
  91. %%--------------------------------------------------------------------
  92. %% @doc Describe a plugin.
  93. -spec describe(name_vsn()) -> {ok, plugin()} | {error, any()}.
  94. describe(NameVsn) ->
  95. read_plugin_info(NameVsn, #{fill_readme => true}).
  96. -spec plugin_avsc(name_vsn()) -> {ok, schema_json()} | {error, any()}.
  97. plugin_avsc(NameVsn) ->
  98. read_plugin_avsc(NameVsn).
  99. -spec plugin_i18n(name_vsn()) -> {ok, i18n_json()} | {error, any()}.
  100. plugin_i18n(NameVsn) ->
  101. read_plugin_i18n(NameVsn).
  102. -spec plugin_avro(name_vsn()) -> {ok, avro_binary()} | {error, any()}.
  103. plugin_avro(NameVsn) ->
  104. read_plugin_avro(NameVsn).
  105. parse_name_vsn(NameVsn) when is_binary(NameVsn) ->
  106. parse_name_vsn(binary_to_list(NameVsn));
  107. parse_name_vsn(NameVsn) when is_list(NameVsn) ->
  108. case lists:splitwith(fun(X) -> X =/= $- end, NameVsn) of
  109. {AppName, [$- | Vsn]} -> {ok, list_to_atom(AppName), Vsn};
  110. _ -> {error, "bad_name_vsn"}
  111. end.
  112. make_name_vsn_string(Name, Vsn) ->
  113. binary_to_list(iolist_to_binary([Name, "-", Vsn])).
  114. %%--------------------------------------------------------------------
  115. %% Package operations
  116. %% @doc Install a .tar.gz package placed in install_dir.
  117. -spec ensure_installed(name_vsn()) -> ok | {error, map()}.
  118. ensure_installed(NameVsn) ->
  119. case read_plugin_info(NameVsn, #{}) of
  120. {ok, _} ->
  121. ok;
  122. {error, _} ->
  123. ok = purge(NameVsn),
  124. do_ensure_installed(NameVsn)
  125. end.
  126. %% @doc Ensure files and directories for the given plugin are being deleted.
  127. %% If a plugin is running, or enabled, an error is returned.
  128. -spec ensure_uninstalled(name_vsn()) -> ok | {error, any()}.
  129. ensure_uninstalled(NameVsn) ->
  130. case read_plugin_info(NameVsn, #{}) of
  131. {ok, #{running_status := RunningSt}} when RunningSt =/= stopped ->
  132. {error, #{
  133. error_msg => "bad_plugin_running_status",
  134. hint => "stop_the_plugin_first"
  135. }};
  136. {ok, #{config_status := enabled}} ->
  137. {error, #{
  138. error_msg => "bad_plugin_config_status",
  139. hint => "disable_the_plugin_first"
  140. }};
  141. _ ->
  142. purge(NameVsn),
  143. ensure_delete(NameVsn)
  144. end.
  145. %% @doc Ensure a plugin is enabled to the end of the plugins list.
  146. -spec ensure_enabled(name_vsn()) -> ok | {error, any()}.
  147. ensure_enabled(NameVsn) ->
  148. ensure_enabled(NameVsn, no_move).
  149. %% @doc Ensure a plugin is enabled at the given position of the plugin list.
  150. -spec ensure_enabled(name_vsn(), position()) -> ok | {error, any()}.
  151. ensure_enabled(NameVsn, Position) ->
  152. ensure_state(NameVsn, Position, _Enabled = true, _ConfLocation = local).
  153. -spec ensure_enabled(name_vsn(), position(), local | global) -> ok | {error, any()}.
  154. ensure_enabled(NameVsn, Position, ConfLocation) when
  155. ConfLocation =:= local; ConfLocation =:= global
  156. ->
  157. ensure_state(NameVsn, Position, _Enabled = true, ConfLocation).
  158. %% @doc Ensure a plugin is disabled.
  159. -spec ensure_disabled(name_vsn()) -> ok | {error, any()}.
  160. ensure_disabled(NameVsn) ->
  161. ensure_state(NameVsn, no_move, false, _ConfLocation = local).
  162. %% @doc Delete extracted dir
  163. %% In case one lib is shared by multiple plugins.
  164. %% it might be the case that purging one plugin's install dir
  165. %% will cause deletion of loaded beams.
  166. %% It should not be a problem, because shared lib should
  167. %% reside in all the plugin install dirs.
  168. -spec purge(name_vsn()) -> ok.
  169. purge(NameVsn) ->
  170. _ = maybe_purge_plugin_config(NameVsn),
  171. purge_plugin(NameVsn).
  172. %% @doc Delete the package file.
  173. -spec delete_package(name_vsn()) -> ok.
  174. delete_package(NameVsn) ->
  175. File = pkg_file_path(NameVsn),
  176. _ = emqx_plugins_serde:delete_schema(NameVsn),
  177. case file:delete(File) of
  178. ok ->
  179. ?SLOG(info, #{msg => "purged_plugin_dir", path => File}),
  180. ok;
  181. {error, enoent} ->
  182. ok;
  183. {error, Reason} ->
  184. ?SLOG(error, #{
  185. msg => "failed_to_delete_package_file",
  186. path => File,
  187. reason => Reason
  188. }),
  189. {error, Reason}
  190. end.
  191. %%--------------------------------------------------------------------
  192. %% Plugin runtime management
  193. %% @doc Start all configured plugins are started.
  194. -spec ensure_started() -> ok.
  195. ensure_started() ->
  196. ok = for_plugins(fun ?MODULE:do_ensure_started/1).
  197. %% @doc Start a plugin from Management API or CLI.
  198. %% the input is a <name>-<vsn> string.
  199. -spec ensure_started(name_vsn()) -> ok | {error, term()}.
  200. ensure_started(NameVsn) ->
  201. case do_ensure_started(NameVsn) of
  202. ok ->
  203. ok;
  204. {error, Reason} ->
  205. ?SLOG(alert, Reason#{msg => "failed_to_start_plugin"}),
  206. {error, Reason}
  207. end.
  208. %% @doc Stop all plugins before broker stops.
  209. -spec ensure_stopped() -> ok.
  210. ensure_stopped() ->
  211. for_plugins(fun ?MODULE:ensure_stopped/1).
  212. %% @doc Stop a plugin from Management API or CLI.
  213. -spec ensure_stopped(name_vsn()) -> ok | {error, term()}.
  214. ensure_stopped(NameVsn) ->
  215. tryit(
  216. "stop_plugin",
  217. fun() ->
  218. Plugin = do_read_plugin(NameVsn),
  219. ensure_apps_stopped(Plugin)
  220. end
  221. ).
  222. get_plugin_config(Name, Vsn, Options, Default) ->
  223. get_plugin_config(make_name_vsn_string(Name, Vsn), Options, Default).
  224. -spec get_plugin_config(name_vsn()) ->
  225. {ok, plugin_config()}
  226. | {error, term()}.
  227. get_plugin_config(NameVsn) ->
  228. get_plugin_config(bin(NameVsn), #{format => ?CONFIG_FORMAT_MAP}).
  229. -spec get_plugin_config(name_vsn(), Options :: map()) ->
  230. {ok, avro_binary() | plugin_config()}
  231. | {error, term()}.
  232. get_plugin_config(NameVsn, #{format := ?CONFIG_FORMAT_AVRO}) ->
  233. %% no default value when get raw binary config
  234. case read_plugin_avro(NameVsn) of
  235. {ok, _AvroJson} = Res -> Res;
  236. {error, _Reason} = Err -> Err
  237. end;
  238. get_plugin_config(NameVsn, Options = #{format := ?CONFIG_FORMAT_MAP}) ->
  239. get_plugin_config(NameVsn, Options, #{}).
  240. get_plugin_config(NameVsn, #{format := ?CONFIG_FORMAT_MAP}, Default) ->
  241. {ok, persistent_term:get(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn), Default)}.
  242. %% @doc Update plugin's config.
  243. %% RPC call from Management API or CLI.
  244. %% the avro binary and plugin config ALWAYS be valid before calling this function.
  245. put_plugin_config(NameVsn, AvroJsonMap, _DecodedPluginConfig) ->
  246. AvroJsonBin = emqx_utils_json:encode(AvroJsonMap),
  247. ok = write_avro_bin(NameVsn, AvroJsonBin),
  248. ok = persistent_term:put(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn), AvroJsonMap),
  249. ok.
  250. %% @doc Stop and then start the plugin.
  251. restart(NameVsn) ->
  252. case ensure_stopped(NameVsn) of
  253. ok -> ensure_started(NameVsn);
  254. {error, Reason} -> {error, Reason}
  255. end.
  256. %% @doc List all installed plugins.
  257. %% Including the ones that are installed, but not enabled in config.
  258. -spec list() -> [plugin()].
  259. list() ->
  260. Pattern = filename:join([install_dir(), "*", "release.json"]),
  261. All = lists:filtermap(
  262. fun(JsonFilePath) ->
  263. [_, NameVsn | _] = lists:reverse(filename:split(JsonFilePath)),
  264. case read_plugin_info(NameVsn, #{}) of
  265. {ok, Info} ->
  266. {true, Info};
  267. {error, Reason} ->
  268. ?SLOG(warning, Reason),
  269. false
  270. end
  271. end,
  272. filelib:wildcard(Pattern)
  273. ),
  274. do_list(configured(), All).
  275. %%--------------------------------------------------------------------
  276. %% Package utils
  277. -spec decode_plugin_avro_config(name_vsn(), map() | binary()) -> {ok, map()} | {error, any()}.
  278. decode_plugin_avro_config(NameVsn, AvroJsonMap) when is_map(AvroJsonMap) ->
  279. decode_plugin_avro_config(NameVsn, emqx_utils_json:encode(AvroJsonMap));
  280. decode_plugin_avro_config(NameVsn, AvroJsonBin) ->
  281. case emqx_plugins_serde:decode(NameVsn, AvroJsonBin) of
  282. {ok, Config} -> {ok, Config};
  283. {error, ReasonMap} -> {error, ReasonMap}
  284. end.
  285. get_config(Key, Default) when is_atom(Key) ->
  286. get_config([Key], Default);
  287. get_config(Path, Default) ->
  288. emqx_conf:get([?CONF_ROOT | Path], Default).
  289. put_config(Key, Value) ->
  290. do_put_config(Key, Value, _ConfLocation = local).
  291. -spec get_tar(name_vsn()) -> {ok, binary()} | {error, any}.
  292. get_tar(NameVsn) ->
  293. TarGz = pkg_file_path(NameVsn),
  294. case file:read_file(TarGz) of
  295. {ok, Content} ->
  296. {ok, Content};
  297. {error, _} ->
  298. case maybe_create_tar(NameVsn, TarGz, install_dir()) of
  299. ok ->
  300. file:read_file(TarGz);
  301. Err ->
  302. Err
  303. end
  304. end.
  305. %%--------------------------------------------------------------------
  306. %% Internal
  307. %%--------------------------------------------------------------------
  308. maybe_create_tar(NameVsn, TarGzName, InstallDir) when is_binary(InstallDir) ->
  309. maybe_create_tar(NameVsn, TarGzName, binary_to_list(InstallDir));
  310. maybe_create_tar(NameVsn, TarGzName, InstallDir) ->
  311. case filelib:wildcard(filename:join(plugin_dir(NameVsn), "**")) of
  312. [_ | _] = PluginFiles ->
  313. InstallDir1 = string:trim(InstallDir, trailing, "/") ++ "/",
  314. PluginFiles1 = [{string:prefix(F, InstallDir1), F} || F <- PluginFiles],
  315. erl_tar:create(TarGzName, PluginFiles1, [compressed]);
  316. _ ->
  317. {error, plugin_not_found}
  318. end.
  319. write_tar_file_content(BaseDir, TarContent) ->
  320. lists:foreach(
  321. fun({Name, Bin}) ->
  322. Filename = filename:join(BaseDir, Name),
  323. ok = filelib:ensure_dir(Filename),
  324. ok = file:write_file(Filename, Bin)
  325. end,
  326. TarContent
  327. ).
  328. delete_tar_file_content(BaseDir, TarContent) ->
  329. lists:foreach(
  330. fun({Name, _}) ->
  331. Filename = filename:join(BaseDir, Name),
  332. case filelib:is_file(Filename) of
  333. true ->
  334. TopDirOrFile = top_dir(BaseDir, Filename),
  335. ok = file:del_dir_r(TopDirOrFile);
  336. false ->
  337. %% probably already deleted
  338. ok
  339. end
  340. end,
  341. TarContent
  342. ).
  343. top_dir(BaseDir0, DirOrFile) ->
  344. BaseDir = normalize_dir(BaseDir0),
  345. case filename:dirname(DirOrFile) of
  346. RockBottom when RockBottom =:= "/" orelse RockBottom =:= "." ->
  347. throw({out_of_bounds, DirOrFile});
  348. BaseDir ->
  349. DirOrFile;
  350. Parent ->
  351. top_dir(BaseDir, Parent)
  352. end.
  353. normalize_dir(Dir) ->
  354. %% Get rid of possible trailing slash
  355. filename:join([Dir, ""]).
  356. -ifdef(TEST).
  357. normalize_dir_test_() ->
  358. [
  359. ?_assertEqual("foo", normalize_dir("foo")),
  360. ?_assertEqual("foo", normalize_dir("foo/")),
  361. ?_assertEqual("/foo", normalize_dir("/foo")),
  362. ?_assertEqual("/foo", normalize_dir("/foo/"))
  363. ].
  364. top_dir_test_() ->
  365. [
  366. ?_assertEqual("base/foo", top_dir("base", filename:join(["base", "foo", "bar"]))),
  367. ?_assertEqual("/base/foo", top_dir("/base", filename:join(["/", "base", "foo", "bar"]))),
  368. ?_assertEqual("/base/foo", top_dir("/base/", filename:join(["/", "base", "foo", "bar"]))),
  369. ?_assertThrow({out_of_bounds, _}, top_dir("/base", filename:join(["/", "base"]))),
  370. ?_assertThrow({out_of_bounds, _}, top_dir("/base", filename:join(["/", "foo", "bar"])))
  371. ].
  372. -endif.
  373. do_ensure_installed(NameVsn) ->
  374. TarGz = pkg_file_path(NameVsn),
  375. case erl_tar:extract(TarGz, [compressed, memory]) of
  376. {ok, TarContent} ->
  377. ok = write_tar_file_content(install_dir(), TarContent),
  378. case read_plugin_info(NameVsn, #{}) of
  379. {ok, _} ->
  380. ok = maybe_post_op_after_install(NameVsn),
  381. ok;
  382. {error, Reason} ->
  383. ?SLOG(warning, Reason#{msg => "failed_to_read_after_install"}),
  384. ok = delete_tar_file_content(install_dir(), TarContent),
  385. {error, Reason}
  386. end;
  387. {error, {_, enoent}} ->
  388. {error, #{
  389. error_msg => "failed_to_extract_plugin_package",
  390. path => TarGz,
  391. reason => not_found
  392. }};
  393. {error, Reason} ->
  394. {error, #{
  395. error_msg => "bad_plugin_package",
  396. path => TarGz,
  397. reason => Reason
  398. }}
  399. end.
  400. ensure_delete(NameVsn0) ->
  401. NameVsn = bin(NameVsn0),
  402. List = configured(),
  403. put_configured(lists:filter(fun(#{name_vsn := N1}) -> bin(N1) =/= NameVsn end, List)),
  404. ok.
  405. ensure_state(NameVsn, Position, State, ConfLocation) when is_binary(NameVsn) ->
  406. ensure_state(binary_to_list(NameVsn), Position, State, ConfLocation);
  407. ensure_state(NameVsn, Position, State, ConfLocation) ->
  408. case read_plugin_info(NameVsn, #{}) of
  409. {ok, _} ->
  410. Item = #{
  411. name_vsn => NameVsn,
  412. enable => State
  413. },
  414. tryit(
  415. "ensure_state",
  416. fun() -> ensure_configured(Item, Position, ConfLocation) end
  417. );
  418. {error, Reason} ->
  419. {error, Reason}
  420. end.
  421. ensure_configured(#{name_vsn := NameVsn} = Item, Position, ConfLocation) ->
  422. Configured = configured(),
  423. SplitFun = fun(#{name_vsn := Nv}) -> bin(Nv) =/= bin(NameVsn) end,
  424. {Front, Rear} = lists:splitwith(SplitFun, Configured),
  425. NewConfigured =
  426. case Rear of
  427. [_ | More] when Position =:= no_move ->
  428. Front ++ [Item | More];
  429. [_ | More] ->
  430. add_new_configured(Front ++ More, Position, Item);
  431. [] ->
  432. add_new_configured(Configured, Position, Item)
  433. end,
  434. ok = put_configured(NewConfigured, ConfLocation).
  435. add_new_configured(Configured, no_move, Item) ->
  436. %% default to rear
  437. add_new_configured(Configured, rear, Item);
  438. add_new_configured(Configured, front, Item) ->
  439. [Item | Configured];
  440. add_new_configured(Configured, rear, Item) ->
  441. Configured ++ [Item];
  442. add_new_configured(Configured, {Action, NameVsn}, Item) ->
  443. SplitFun = fun(#{name_vsn := Nv}) -> bin(Nv) =/= bin(NameVsn) end,
  444. {Front, Rear} = lists:splitwith(SplitFun, Configured),
  445. Rear =:= [] andalso
  446. throw(#{
  447. error_msg => "position_anchor_plugin_not_configured",
  448. hint => "maybe_install_and_configure",
  449. name_vsn => NameVsn
  450. }),
  451. case Action of
  452. before ->
  453. Front ++ [Item | Rear];
  454. behind ->
  455. [Anchor | Rear0] = Rear,
  456. Front ++ [Anchor, Item | Rear0]
  457. end.
  458. maybe_purge_plugin_config(NameVsn) ->
  459. _ = persistent_term:erase(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn)),
  460. ok.
  461. purge_plugin(NameVsn) ->
  462. Dir = plugin_dir(NameVsn),
  463. purge_plugin_dir(Dir).
  464. purge_plugin_dir(Dir) ->
  465. case file:del_dir_r(Dir) of
  466. ok ->
  467. ?SLOG(info, #{
  468. msg => "purged_plugin_dir",
  469. dir => Dir
  470. });
  471. {error, enoent} ->
  472. ok;
  473. {error, Reason} ->
  474. ?SLOG(error, #{
  475. msg => "failed_to_purge_plugin_dir",
  476. dir => Dir,
  477. reason => Reason
  478. }),
  479. {error, Reason}
  480. end.
  481. %% Make sure configured ones are ordered in front.
  482. do_list([], All) ->
  483. All;
  484. do_list([#{name_vsn := NameVsn} | Rest], All) ->
  485. SplitF = fun(#{<<"name">> := Name, <<"rel_vsn">> := Vsn}) ->
  486. bin([Name, "-", Vsn]) =/= bin(NameVsn)
  487. end,
  488. case lists:splitwith(SplitF, All) of
  489. {_, []} ->
  490. ?SLOG(warning, #{
  491. msg => "configured_plugin_not_installed",
  492. name_vsn => NameVsn
  493. }),
  494. do_list(Rest, All);
  495. {Front, [I | Rear]} ->
  496. [I | do_list(Rest, Front ++ Rear)]
  497. end.
  498. do_ensure_started(NameVsn) ->
  499. tryit(
  500. "start_plugins",
  501. fun() ->
  502. case ensure_exists_and_installed(NameVsn) of
  503. ok ->
  504. Plugin = do_read_plugin(NameVsn),
  505. ok = load_code_start_apps(NameVsn, Plugin);
  506. {error, plugin_not_found} ->
  507. ?SLOG(error, #{
  508. error_msg => "plugin_not_found",
  509. name_vsn => NameVsn
  510. }),
  511. ok
  512. end
  513. end
  514. ).
  515. %%--------------------------------------------------------------------
  516. %% try the function, catch 'throw' exceptions as normal 'error' return
  517. %% other exceptions with stacktrace logged.
  518. tryit(WhichOp, F) ->
  519. try
  520. F()
  521. catch
  522. throw:ReasonMap ->
  523. %% thrown exceptions are known errors
  524. %% translate to a return value without stacktrace
  525. {error, ReasonMap};
  526. error:Reason:Stacktrace ->
  527. %% unexpected errors, log stacktrace
  528. ?SLOG(warning, #{
  529. msg => "plugin_op_failed",
  530. which_op => WhichOp,
  531. exception => Reason,
  532. stacktrace => Stacktrace
  533. }),
  534. {error, {failed, WhichOp}}
  535. end.
  536. %% read plugin info from the JSON file
  537. %% returns {ok, Info} or {error, Reason}
  538. read_plugin_info(NameVsn, Options) ->
  539. tryit(
  540. atom_to_list(?FUNCTION_NAME),
  541. fun() -> {ok, do_read_plugin2(NameVsn, Options)} end
  542. ).
  543. do_read_plugin(NameVsn) ->
  544. do_read_plugin2(NameVsn, #{}).
  545. do_read_plugin2(NameVsn, Option) ->
  546. do_read_plugin3(NameVsn, info_file_path(NameVsn), Option).
  547. do_read_plugin3(NameVsn, InfoFilePath, Options) ->
  548. {ok, PlainMap} = (read_file_fun(InfoFilePath, "bad_info_file", #{read_mode => ?JSON_MAP}))(),
  549. Info0 = check_plugin(PlainMap, NameVsn, InfoFilePath),
  550. Info1 = plugins_readme(NameVsn, Options, Info0),
  551. plugin_status(NameVsn, Info1).
  552. read_plugin_avsc(NameVsn) ->
  553. read_plugin_avsc(NameVsn, #{read_mode => ?JSON_MAP}).
  554. read_plugin_avsc(NameVsn, Options) ->
  555. tryit(
  556. atom_to_list(?FUNCTION_NAME),
  557. read_file_fun(avsc_file_path(NameVsn), "bad_avsc_file", Options)
  558. ).
  559. read_plugin_i18n(NameVsn) ->
  560. read_plugin_i18n(NameVsn, #{read_mode => ?JSON_MAP}).
  561. read_plugin_i18n(NameVsn, Options) ->
  562. tryit(
  563. atom_to_list(?FUNCTION_NAME),
  564. read_file_fun(i18n_file_path(NameVsn), "bad_i18n_file", Options)
  565. ).
  566. read_plugin_avro(NameVsn) ->
  567. read_plugin_avro(NameVsn, #{read_mode => ?RAW_BIN}).
  568. read_plugin_avro(NameVsn, Options) ->
  569. tryit(
  570. atom_to_list(?FUNCTION_NAME),
  571. read_file_fun(avro_config_file(NameVsn), "bad_avro_file", Options)
  572. ).
  573. ensure_exists_and_installed(NameVsn) ->
  574. case filelib:is_dir(plugin_dir(NameVsn)) of
  575. true ->
  576. ok;
  577. false ->
  578. %% Do we have the package, but it's not extracted yet?
  579. case get_tar(NameVsn) of
  580. {ok, TarContent} ->
  581. ok = file:write_file(pkg_file_path(NameVsn), TarContent),
  582. ok = do_ensure_installed(NameVsn);
  583. _ ->
  584. %% If not, try to get it from the cluster.
  585. do_get_from_cluster(NameVsn)
  586. end
  587. end.
  588. do_get_from_cluster(NameVsn) ->
  589. Nodes = [N || N <- mria:running_nodes(), N /= node()],
  590. case get_from_any_node(Nodes, NameVsn, []) of
  591. {ok, TarContent} ->
  592. ok = file:write_file(pkg_file_path(NameVsn), TarContent),
  593. ok = do_ensure_installed(NameVsn);
  594. {error, NodeErrors} when Nodes =/= [] ->
  595. ?SLOG(error, #{
  596. msg => "failed_to_copy_plugin_from_other_nodes",
  597. name_vsn => NameVsn,
  598. node_errors => NodeErrors
  599. }),
  600. {error, plugin_not_found};
  601. {error, _} ->
  602. ?SLOG(error, #{
  603. msg => "no_nodes_to_copy_plugin_from",
  604. name_vsn => NameVsn
  605. }),
  606. {error, plugin_not_found}
  607. end.
  608. get_from_any_node([], _NameVsn, Errors) ->
  609. {error, Errors};
  610. get_from_any_node([Node | T], NameVsn, Errors) ->
  611. case emqx_plugins_proto_v1:get_tar(Node, NameVsn, infinity) of
  612. {ok, _} = Res ->
  613. Res;
  614. Err ->
  615. get_from_any_node(T, NameVsn, [{Node, Err} | Errors])
  616. end.
  617. plugins_readme(NameVsn, #{fill_readme := true}, Info) ->
  618. case file:read_file(readme_file(NameVsn)) of
  619. {ok, Bin} -> Info#{readme => Bin};
  620. _ -> Info#{readme => <<>>}
  621. end;
  622. plugins_readme(_NameVsn, _Options, Info) ->
  623. Info.
  624. plugin_status(NameVsn, Info) ->
  625. {ok, AppName, _AppVsn} = parse_name_vsn(NameVsn),
  626. RunningSt =
  627. case application:get_key(AppName, vsn) of
  628. {ok, _} ->
  629. case lists:keyfind(AppName, 1, running_apps()) of
  630. {AppName, _} -> running;
  631. _ -> loaded
  632. end;
  633. undefined ->
  634. stopped
  635. end,
  636. Configured = lists:filtermap(
  637. fun(#{name_vsn := Nv, enable := St}) ->
  638. case bin(Nv) =:= bin(NameVsn) of
  639. true -> {true, St};
  640. false -> false
  641. end
  642. end,
  643. configured()
  644. ),
  645. ConfSt =
  646. case Configured of
  647. [] -> not_configured;
  648. [true] -> enabled;
  649. [false] -> disabled
  650. end,
  651. Info#{
  652. running_status => RunningSt,
  653. config_status => ConfSt
  654. }.
  655. check_plugin(
  656. #{
  657. <<"name">> := Name,
  658. <<"rel_vsn">> := Vsn,
  659. <<"rel_apps">> := Apps,
  660. <<"description">> := _
  661. } = Info,
  662. NameVsn,
  663. FilePath
  664. ) ->
  665. case bin(NameVsn) =:= bin([Name, "-", Vsn]) of
  666. true ->
  667. try
  668. %% assert
  669. [_ | _] = Apps,
  670. %% validate if the list is all <app>-<vsn> strings
  671. lists:foreach(fun(App) -> {ok, _, _} = parse_name_vsn(App) end, Apps)
  672. catch
  673. _:_ ->
  674. throw(#{
  675. error_msg => "bad_rel_apps",
  676. rel_apps => Apps,
  677. hint => "A non-empty string list of app_name-app_vsn format"
  678. })
  679. end,
  680. Info;
  681. false ->
  682. throw(#{
  683. error_msg => "name_vsn_mismatch",
  684. name_vsn => NameVsn,
  685. path => FilePath,
  686. name => Name,
  687. rel_vsn => Vsn
  688. })
  689. end;
  690. check_plugin(_What, NameVsn, File) ->
  691. throw(#{
  692. error_msg => "bad_info_file_content",
  693. mandatory_fields => [rel_vsn, name, rel_apps, description],
  694. name_vsn => NameVsn,
  695. path => File
  696. }).
  697. load_code_start_apps(RelNameVsn, #{<<"rel_apps">> := Apps}) ->
  698. LibDir = filename:join([install_dir(), RelNameVsn]),
  699. RunningApps = running_apps(),
  700. %% load plugin apps and beam code
  701. AppNames =
  702. lists:map(
  703. fun(AppNameVsn) ->
  704. {ok, AppName, AppVsn} = parse_name_vsn(AppNameVsn),
  705. EbinDir = filename:join([LibDir, AppNameVsn, "ebin"]),
  706. ok = load_plugin_app(AppName, AppVsn, EbinDir, RunningApps),
  707. AppName
  708. end,
  709. Apps
  710. ),
  711. lists:foreach(fun start_app/1, AppNames).
  712. load_plugin_app(AppName, AppVsn, Ebin, RunningApps) ->
  713. case lists:keyfind(AppName, 1, RunningApps) of
  714. false ->
  715. do_load_plugin_app(AppName, Ebin);
  716. {_, Vsn} ->
  717. case bin(Vsn) =:= bin(AppVsn) of
  718. true ->
  719. %% already started on the exact version
  720. ok;
  721. false ->
  722. %% running but a different version
  723. ?SLOG(warning, #{
  724. msg => "plugin_app_already_running",
  725. name => AppName,
  726. running_vsn => Vsn,
  727. loading_vsn => AppVsn
  728. })
  729. end
  730. end.
  731. do_load_plugin_app(AppName, Ebin) when is_binary(Ebin) ->
  732. do_load_plugin_app(AppName, binary_to_list(Ebin));
  733. do_load_plugin_app(AppName, Ebin) ->
  734. _ = code:add_patha(Ebin),
  735. Modules = filelib:wildcard(filename:join([Ebin, "*.beam"])),
  736. lists:foreach(
  737. fun(BeamFile) ->
  738. Module = list_to_atom(filename:basename(BeamFile, ".beam")),
  739. _ = code:purge(Module),
  740. case code:load_file(Module) of
  741. {module, _} ->
  742. ok;
  743. {error, Reason} ->
  744. throw(#{
  745. error_msg => "failed_to_load_plugin_beam",
  746. path => BeamFile,
  747. reason => Reason
  748. })
  749. end
  750. end,
  751. Modules
  752. ),
  753. case application:load(AppName) of
  754. ok ->
  755. ok;
  756. {error, {already_loaded, _}} ->
  757. ok;
  758. {error, Reason} ->
  759. throw(#{
  760. error_msg => "failed_to_load_plugin_app",
  761. name => AppName,
  762. reason => Reason
  763. })
  764. end.
  765. start_app(App) ->
  766. case application:ensure_all_started(App) of
  767. {ok, Started} ->
  768. case Started =/= [] of
  769. true -> ?SLOG(debug, #{msg => "started_plugin_apps", apps => Started});
  770. false -> ok
  771. end,
  772. ?SLOG(debug, #{msg => "started_plugin_app", app => App}),
  773. ok;
  774. {error, {ErrApp, Reason}} ->
  775. throw(#{
  776. error_msg => "failed_to_start_plugin_app",
  777. app => App,
  778. err_app => ErrApp,
  779. reason => Reason
  780. })
  781. end.
  782. %% Stop all apps installed by the plugin package,
  783. %% but not the ones shared with others.
  784. ensure_apps_stopped(#{<<"rel_apps">> := Apps}) ->
  785. %% load plugin apps and beam code
  786. AppsToStop =
  787. lists:map(
  788. fun(NameVsn) ->
  789. {ok, AppName, _AppVsn} = parse_name_vsn(NameVsn),
  790. AppName
  791. end,
  792. Apps
  793. ),
  794. case tryit("stop_apps", fun() -> stop_apps(AppsToStop) end) of
  795. {ok, []} ->
  796. %% all apps stopped
  797. ok;
  798. {ok, Left} ->
  799. ?SLOG(warning, #{
  800. msg => "unabled_to_stop_plugin_apps",
  801. apps => Left,
  802. reason => "running_apps_still_depends_on_this_apps"
  803. }),
  804. ok;
  805. {error, Reason} ->
  806. {error, Reason}
  807. end.
  808. stop_apps(Apps) ->
  809. RunningApps = running_apps(),
  810. case do_stop_apps(Apps, [], RunningApps) of
  811. %% all stopped
  812. {ok, []} -> {ok, []};
  813. %% no progress
  814. {ok, Remain} when Remain =:= Apps -> {ok, Apps};
  815. %% try again
  816. {ok, Remain} -> stop_apps(Remain)
  817. end.
  818. do_stop_apps([], Remain, _AllApps) ->
  819. {ok, lists:reverse(Remain)};
  820. do_stop_apps([App | Apps], Remain, RunningApps) ->
  821. case is_needed_by_any(App, RunningApps) of
  822. true ->
  823. do_stop_apps(Apps, [App | Remain], RunningApps);
  824. false ->
  825. ok = stop_app(App),
  826. do_stop_apps(Apps, Remain, RunningApps)
  827. end.
  828. stop_app(App) ->
  829. case application:stop(App) of
  830. ok ->
  831. ?SLOG(debug, #{msg => "stop_plugin_successfully", app => App}),
  832. ok = unload_moudle_and_app(App);
  833. {error, {not_started, App}} ->
  834. ?SLOG(debug, #{msg => "plugin_not_started", app => App}),
  835. ok = unload_moudle_and_app(App);
  836. {error, Reason} ->
  837. throw(#{error_msg => "failed_to_stop_app", app => App, reason => Reason})
  838. end.
  839. unload_moudle_and_app(App) ->
  840. case application:get_key(App, modules) of
  841. {ok, Modules} -> lists:foreach(fun code:soft_purge/1, Modules);
  842. _ -> ok
  843. end,
  844. _ = application:unload(App),
  845. ok.
  846. is_needed_by_any(AppToStop, RunningApps) ->
  847. lists:any(
  848. fun({RunningApp, _RunningAppVsn}) ->
  849. is_needed_by(AppToStop, RunningApp)
  850. end,
  851. RunningApps
  852. ).
  853. is_needed_by(AppToStop, AppToStop) ->
  854. false;
  855. is_needed_by(AppToStop, RunningApp) ->
  856. case application:get_key(RunningApp, applications) of
  857. {ok, Deps} -> lists:member(AppToStop, Deps);
  858. undefined -> false
  859. end.
  860. do_put_config(Key, Value, ConfLocation) when is_atom(Key) ->
  861. do_put_config([Key], Value, ConfLocation);
  862. do_put_config(Path, Values, _ConfLocation = local) when is_list(Path) ->
  863. Opts = #{rawconf_with_defaults => true, override_to => cluster},
  864. %% Already in cluster_rpc, don't use emqx_conf:update, dead calls
  865. case emqx:update_config([?CONF_ROOT | Path], bin_key(Values), Opts) of
  866. {ok, _} -> ok;
  867. Error -> Error
  868. end;
  869. do_put_config(Path, Values, _ConfLocation = global) when is_list(Path) ->
  870. Opts = #{rawconf_with_defaults => true, override_to => cluster},
  871. case emqx_conf:update([?CONF_ROOT | Path], bin_key(Values), Opts) of
  872. {ok, _} -> ok;
  873. Error -> Error
  874. end.
  875. %%--------------------------------------------------------------------
  876. %% `emqx_config_handler' API
  877. %%--------------------------------------------------------------------
  878. post_config_update([?CONF_ROOT], _Req, #{states := NewStates}, #{states := OldStates}, _Envs) ->
  879. NewStatesIndex = maps:from_list([{NV, S} || S = #{name_vsn := NV} <- NewStates]),
  880. OldStatesIndex = maps:from_list([{NV, S} || S = #{name_vsn := NV} <- OldStates]),
  881. #{changed := Changed} = emqx_utils_maps:diff_maps(NewStatesIndex, OldStatesIndex),
  882. maps:foreach(fun enable_disable_plugin/2, Changed),
  883. ok;
  884. post_config_update(_Path, _Req, _NewConf, _OldConf, _Envs) ->
  885. ok.
  886. enable_disable_plugin(NameVsn, {#{enable := true}, #{enable := false}}) ->
  887. %% errors are already logged in this fn
  888. _ = ensure_stopped(NameVsn),
  889. ok;
  890. enable_disable_plugin(NameVsn, {#{enable := false}, #{enable := true}}) ->
  891. %% errors are already logged in this fn
  892. _ = ensure_started(NameVsn),
  893. ok;
  894. enable_disable_plugin(_NameVsn, _Diff) ->
  895. ok.
  896. %%--------------------------------------------------------------------
  897. %% Helper functions
  898. %%--------------------------------------------------------------------
  899. install_dir() ->
  900. get_config(install_dir, "").
  901. put_configured(Configured) ->
  902. put_configured(Configured, _ConfLocation = local).
  903. put_configured(Configured, ConfLocation) ->
  904. ok = do_put_config(states, bin_key(Configured), ConfLocation).
  905. configured() ->
  906. get_config(states, []).
  907. for_plugins(ActionFun) ->
  908. case lists:flatmap(fun(I) -> for_plugin(I, ActionFun) end, configured()) of
  909. [] -> ok;
  910. Errors -> erlang:error(#{function => ActionFun, errors => Errors})
  911. end.
  912. for_plugin(#{name_vsn := NameVsn, enable := true}, Fun) ->
  913. case Fun(NameVsn) of
  914. ok -> [];
  915. {error, Reason} -> [{NameVsn, Reason}]
  916. end;
  917. for_plugin(#{name_vsn := NameVsn, enable := false}, _Fun) ->
  918. ?SLOG(debug, #{
  919. msg => "plugin_disabled",
  920. name_vsn => NameVsn
  921. }),
  922. [].
  923. maybe_post_op_after_install(NameVsn) ->
  924. _ = maybe_load_config_schema(NameVsn),
  925. _ = maybe_create_config_dir(NameVsn),
  926. ok.
  927. maybe_load_config_schema(NameVsn) ->
  928. AvscPath = avsc_file_path(NameVsn),
  929. filelib:is_regular(AvscPath) andalso
  930. do_load_config_schema(NameVsn, AvscPath).
  931. do_load_config_schema(NameVsn, AvscPath) ->
  932. case emqx_plugins_serde:add_schema(NameVsn, AvscPath) of
  933. ok -> ok;
  934. {error, already_exists} -> ok;
  935. {error, _Reason} -> ok
  936. end.
  937. maybe_create_config_dir(NameVsn) ->
  938. ConfigDir = plugin_config_dir(NameVsn),
  939. case filelib:ensure_path(ConfigDir) of
  940. ok ->
  941. ok;
  942. {error, Reason} ->
  943. ?SLOG(warning, #{
  944. msg => "failed_to_create_plugin_config_dir",
  945. dir => ConfigDir,
  946. reason => Reason
  947. }),
  948. {error, {mkdir_failed, ConfigDir, Reason}}
  949. end.
  950. write_avro_bin(NameVsn, AvroBin) ->
  951. ok = file:write_file(avro_config_file(NameVsn), AvroBin).
  952. read_file_fun(Path, ErrMsg, #{read_mode := ?RAW_BIN}) ->
  953. fun() ->
  954. case file:read_file(Path) of
  955. {ok, Bin} ->
  956. {ok, Bin};
  957. {error, Reason} ->
  958. ErrMeta = #{error_msg => ErrMsg, reason => Reason},
  959. throw(ErrMeta)
  960. end
  961. end;
  962. read_file_fun(Path, ErrMsg, #{read_mode := ?JSON_MAP}) ->
  963. fun() ->
  964. case hocon:load(Path, #{format => richmap}) of
  965. {ok, RichMap} ->
  966. {ok, hocon_maps:ensure_plain(RichMap)};
  967. {error, Reason} ->
  968. ErrMeta = #{error_msg => ErrMsg, reason => Reason},
  969. throw(ErrMeta)
  970. end
  971. end.
  972. %% Directorys
  973. plugin_dir(NameVsn) ->
  974. filename:join([install_dir(), NameVsn]).
  975. plugin_config_dir(NameVsn) ->
  976. filename:join([plugin_dir(NameVsn), "data", "configs"]).
  977. %% Files
  978. pkg_file_path(NameVsn) ->
  979. filename:join([install_dir(), bin([NameVsn, ".tar.gz"])]).
  980. info_file_path(NameVsn) ->
  981. filename:join([plugin_dir(NameVsn), "release.json"]).
  982. avsc_file_path(NameVsn) ->
  983. filename:join([plugin_dir(NameVsn), "config_schema.avsc"]).
  984. avro_config_file(NameVsn) ->
  985. filename:join([plugin_config_dir(NameVsn), "config.avro"]).
  986. i18n_file_path(NameVsn) ->
  987. filename:join([plugin_dir(NameVsn), "config_i18n.json"]).
  988. readme_file(NameVsn) ->
  989. filename:join([plugin_dir(NameVsn), "README.md"]).
  990. running_apps() ->
  991. lists:map(
  992. fun({N, _, V}) ->
  993. {N, V}
  994. end,
  995. application:which_applications(infinity)
  996. ).
  997. bin_key(Map) when is_map(Map) ->
  998. maps:fold(fun(K, V, Acc) -> Acc#{bin(K) => V} end, #{}, Map);
  999. bin_key(List = [#{} | _]) ->
  1000. lists:map(fun(M) -> bin_key(M) end, List);
  1001. bin_key(Term) ->
  1002. Term.
  1003. bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
  1004. bin(L) when is_list(L) -> unicode:characters_to_binary(L, utf8);
  1005. bin(B) when is_binary(B) -> B.