emqx_plugins.erl 47 KB


  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2017-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_plugins).
  17. -feature(maybe_expr, enable).
  18. -include("emqx_plugins.hrl").
  19. -include_lib("emqx/include/logger.hrl").
  20. -include_lib("snabbkaffe/include/trace.hrl").
  21. -ifdef(TEST).
  22. -include_lib("eunit/include/eunit.hrl").
  23. -endif.
  24. -export([
  25. describe/1,
  26. plugin_schema_json/1,
  27. plugin_i18n_json/1,
  28. raw_plugin_config_content/1,
  29. parse_name_vsn/1,
  30. make_name_vsn_string/2
  31. ]).
  32. %% Package operations
  33. -export([
  34. ensure_installed/0,
  35. ensure_installed/1,
  36. ensure_uninstalled/1,
  37. ensure_enabled/1,
  38. ensure_enabled/2,
  39. ensure_enabled/3,
  40. ensure_disabled/1,
  41. purge/1,
  42. delete_package/1
  43. ]).
  44. %% Plugin runtime management
  45. -export([
  46. ensure_started/0,
  47. ensure_started/1,
  48. ensure_stopped/0,
  49. ensure_stopped/1,
  50. restart/1,
  51. list/0
  52. ]).
  53. %% Plugin config APIs
  54. -export([
  55. get_config/1,
  56. get_config/2,
  57. get_config/3,
  58. get_config/4,
  59. put_config/3
  60. ]).
  61. %% Package utils
  62. -export([
  63. decode_plugin_config_map/2,
  64. install_dir/0,
  65. avsc_file_path/1,
  66. with_plugin_avsc/1
  67. ]).
  68. %% `emqx_config_handler' API
  69. -export([
  70. post_config_update/5
  71. ]).
  72. %% RPC call
  73. -export([get_tar/1]).
  74. %% Internal export
  75. -export([
  76. ensure_config_map/1,
  77. do_ensure_started/1
  78. ]).
  79. %% for test cases
  80. -export([put_config_internal/2]).
  81. -ifdef(TEST).
  82. -compile(export_all).
  83. -compile(nowarn_export_all).
  84. -endif.
  85. %% Defines
  86. -define(PLUGIN_PERSIS_CONFIG_KEY(NameVsn), {?MODULE, NameVsn}).
  87. -define(RAW_BIN, binary).
  88. -define(JSON_MAP, json_map).
  89. -define(MAX_KEEP_BACKUP_CONFIGS, 10).
  90. %%--------------------------------------------------------------------
  91. %% APIs
  92. %%--------------------------------------------------------------------
  93. %% @doc Describe a plugin.
  94. -spec describe(name_vsn()) -> {ok, plugin_info()} | {error, any()}.
  95. describe(NameVsn) ->
  96. read_plugin_info(NameVsn, #{fill_readme => true}).
  97. -spec plugin_schema_json(name_vsn()) -> {ok, schema_json_map()} | {error, any()}.
  98. plugin_schema_json(NameVsn) ->
  99. read_plugin_avsc(NameVsn).
  100. -spec plugin_i18n_json(name_vsn()) -> {ok, i18n_json_map()} | {error, any()}.
  101. plugin_i18n_json(NameVsn) ->
  102. read_plugin_i18n(NameVsn).
  103. -spec raw_plugin_config_content(name_vsn()) -> {ok, raw_plugin_config_content()} | {error, any()}.
  104. raw_plugin_config_content(NameVsn) ->
  105. read_plugin_hocon(NameVsn).
  106. parse_name_vsn(NameVsn) when is_binary(NameVsn) ->
  107. parse_name_vsn(binary_to_list(NameVsn));
  108. parse_name_vsn(NameVsn) when is_list(NameVsn) ->
  109. case lists:splitwith(fun(X) -> X =/= $- end, NameVsn) of
  110. {AppName, [$- | Vsn]} -> {ok, list_to_atom(AppName), Vsn};
  111. _ -> {error, "bad_name_vsn"}
  112. end.
  113. make_name_vsn_string(Name, Vsn) ->
  114. binary_to_list(iolist_to_binary([Name, "-", Vsn])).
  115. %%--------------------------------------------------------------------
  116. %% Package operations
  117. %% @doc Start all configured plugins are started.
  118. -spec ensure_installed() -> ok.
  119. ensure_installed() ->
  120. Fun = fun(#{name_vsn := NameVsn}) ->
  121. case ensure_installed(NameVsn) of
  122. ok -> [];
  123. {error, Reason} -> [{NameVsn, Reason}]
  124. end
  125. end,
  126. ok = for_plugins(Fun).
  127. %% @doc Install a .tar.gz package placed in install_dir.
  128. -spec ensure_installed(name_vsn()) -> ok | {error, map()}.
  129. ensure_installed(NameVsn) ->
  130. case read_plugin_info(NameVsn, #{}) of
  131. {ok, _} ->
  132. ok,
  133. _ = maybe_ensure_plugin_config(NameVsn);
  134. {error, _} ->
  135. ok = purge(NameVsn),
  136. case ensure_exists_and_installed(NameVsn) of
  137. ok ->
  138. maybe_post_op_after_installed(NameVsn),
  139. _ = maybe_ensure_plugin_config(NameVsn),
  140. ok;
  141. {error, _Reason} = Err ->
  142. Err
  143. end
  144. end.
  145. %% @doc Ensure files and directories for the given plugin are being deleted.
  146. %% If a plugin is running, or enabled, an error is returned.
  147. -spec ensure_uninstalled(name_vsn()) -> ok | {error, any()}.
  148. ensure_uninstalled(NameVsn) ->
  149. case read_plugin_info(NameVsn, #{}) of
  150. {ok, #{running_status := RunningSt}} when RunningSt =/= stopped ->
  151. {error, #{
  152. error_msg => "bad_plugin_running_status",
  153. hint => "stop_the_plugin_first"
  154. }};
  155. {ok, #{config_status := enabled}} ->
  156. {error, #{
  157. error_msg => "bad_plugin_config_status",
  158. hint => "disable_the_plugin_first"
  159. }};
  160. _ ->
  161. purge(NameVsn),
  162. ensure_delete(NameVsn)
  163. end.
  164. %% @doc Ensure a plugin is enabled to the end of the plugins list.
  165. -spec ensure_enabled(name_vsn()) -> ok | {error, any()}.
  166. ensure_enabled(NameVsn) ->
  167. ensure_enabled(NameVsn, no_move).
  168. %% @doc Ensure a plugin is enabled at the given position of the plugin list.
  169. -spec ensure_enabled(name_vsn(), position()) -> ok | {error, any()}.
  170. ensure_enabled(NameVsn, Position) ->
  171. ensure_state(NameVsn, Position, _Enabled = true, _ConfLocation = local).
  172. -spec ensure_enabled(name_vsn(), position(), local | global) -> ok | {error, any()}.
  173. ensure_enabled(NameVsn, Position, ConfLocation) when
  174. ConfLocation =:= local; ConfLocation =:= global
  175. ->
  176. ensure_state(NameVsn, Position, _Enabled = true, ConfLocation).
  177. %% @doc Ensure a plugin is disabled.
  178. -spec ensure_disabled(name_vsn()) -> ok | {error, any()}.
  179. ensure_disabled(NameVsn) ->
  180. ensure_state(NameVsn, no_move, false, _ConfLocation = local).
  181. %% @doc Delete extracted dir
  182. %% In case one lib is shared by multiple plugins.
  183. %% it might be the case that purging one plugin's install dir
  184. %% will cause deletion of loaded beams.
  185. %% It should not be a problem, because shared lib should
  186. %% reside in all the plugin install dirs.
  187. -spec purge(name_vsn()) -> ok.
  188. purge(NameVsn) ->
  189. _ = maybe_purge_plugin_config(NameVsn),
  190. purge_plugin(NameVsn).
  191. %% @doc Delete the package file.
  192. -spec delete_package(name_vsn()) -> ok.
  193. delete_package(NameVsn) ->
  194. File = pkg_file_path(NameVsn),
  195. _ = emqx_plugins_serde:delete_schema(NameVsn),
  196. case file:delete(File) of
  197. ok ->
  198. ?SLOG(info, #{msg => "purged_plugin_dir", path => File}),
  199. ok;
  200. {error, enoent} ->
  201. ok;
  202. {error, Reason} ->
  203. ?SLOG(error, #{
  204. msg => "failed_to_delete_package_file",
  205. path => File,
  206. reason => Reason
  207. }),
  208. {error, Reason}
  209. end.
  210. %%--------------------------------------------------------------------
  211. %% Plugin runtime management
  212. %% @doc Start all configured plugins are started.
  213. -spec ensure_started() -> ok.
  214. ensure_started() ->
  215. Fun = fun
  216. (#{name_vsn := NameVsn, enable := true}) ->
  217. case do_ensure_started(NameVsn) of
  218. ok -> [];
  219. {error, Reason} -> [{NameVsn, Reason}]
  220. end;
  221. (#{name_vsn := NameVsn, enable := false}) ->
  222. ?SLOG(debug, #{msg => "plugin_disabled", name_vsn => NameVsn}),
  223. []
  224. end,
  225. ok = for_plugins(Fun).
  226. %% @doc Start a plugin from Management API or CLI.
  227. %% the input is a <name>-<vsn> string.
  228. -spec ensure_started(name_vsn()) -> ok | {error, term()}.
  229. ensure_started(NameVsn) ->
  230. case do_ensure_started(NameVsn) of
  231. ok ->
  232. ok;
  233. {error, Reason} ->
  234. ?SLOG(alert, Reason#{msg => "failed_to_start_plugin"}),
  235. {error, Reason}
  236. end.
  237. %% @doc Stop all plugins before broker stops.
  238. -spec ensure_stopped() -> ok.
  239. ensure_stopped() ->
  240. Fun = fun
  241. (#{name_vsn := NameVsn, enable := true}) ->
  242. case ensure_stopped(NameVsn) of
  243. ok -> [];
  244. {error, Reason} -> [{NameVsn, Reason}]
  245. end;
  246. (#{name_vsn := NameVsn, enable := false}) ->
  247. ?SLOG(debug, #{msg => "plugin_disabled", action => stop_plugin, name_vsn => NameVsn}),
  248. []
  249. end,
  250. ok = for_plugins(Fun).
  251. %% @doc Stop a plugin from Management API or CLI.
  252. -spec ensure_stopped(name_vsn()) -> ok | {error, term()}.
  253. ensure_stopped(NameVsn) ->
  254. tryit(
  255. "stop_plugin",
  256. fun() ->
  257. Plugin = do_read_plugin(NameVsn),
  258. ensure_apps_stopped(Plugin)
  259. end
  260. ).
  261. get_config(Name, Vsn, Opt, Default) ->
  262. get_config(make_name_vsn_string(Name, Vsn), Opt, Default).
  263. -spec get_config(name_vsn()) ->
  264. {ok, plugin_config_map() | any()}
  265. | {error, term()}.
  266. get_config(NameVsn) ->
  267. get_config(NameVsn, ?CONFIG_FORMAT_MAP, #{}).
  268. -spec get_config(name_vsn(), ?CONFIG_FORMAT_MAP | ?CONFIG_FORMAT_BIN) ->
  269. {ok, raw_plugin_config_content() | plugin_config_map() | any()}
  270. | {error, term()}.
  271. get_config(NameVsn, ?CONFIG_FORMAT_MAP) ->
  272. get_config(NameVsn, ?CONFIG_FORMAT_MAP, #{});
  273. get_config(NameVsn, ?CONFIG_FORMAT_BIN) ->
  274. get_config_bin(NameVsn).
  275. %% Present default config value only in map format.
  276. -spec get_config(name_vsn(), ?CONFIG_FORMAT_MAP, any()) ->
  277. {ok, plugin_config_map() | any()}
  278. | {error, term()}.
  279. get_config(NameVsn, ?CONFIG_FORMAT_MAP, Default) ->
  280. {ok, persistent_term:get(?PLUGIN_PERSIS_CONFIG_KEY(bin(NameVsn)), Default)}.
  281. get_config_bin(NameVsn) ->
  282. %% no default value when get raw binary config
  283. case read_plugin_hocon(NameVsn) of
  284. {ok, _Map} = Res -> Res;
  285. {error, _Reason} = Err -> Err
  286. end.
  287. %% @doc Update plugin's config.
  288. %% RPC call from Management API or CLI.
  289. %% The plugin config Json Map was valid by avro schema
  290. %% Or: if no and plugin config ALWAYS be valid before calling this function.
  291. put_config(NameVsn, ConfigJsonMap, AvroValue) when not is_binary(NameVsn) ->
  292. put_config(bin(NameVsn), ConfigJsonMap, AvroValue);
  293. put_config(NameVsn, ConfigJsonMap, _AvroValue) ->
  294. HoconBin = hocon_pp:do(ConfigJsonMap, #{}),
  295. ok = backup_and_write_hocon_bin(NameVsn, HoconBin),
  296. %% TODO: callback in plugin's on_config_changed (config update by mgmt API)
  297. %% TODO: callback in plugin's on_config_upgraded (config vsn upgrade v1 -> v2)
  298. ok = persistent_term:put(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn), ConfigJsonMap),
  299. ok.
  300. %% @doc Stop and then start the plugin.
  301. restart(NameVsn) ->
  302. case ensure_stopped(NameVsn) of
  303. ok -> ensure_started(NameVsn);
  304. {error, Reason} -> {error, Reason}
  305. end.
  306. %% @doc List all installed plugins.
  307. %% Including the ones that are installed, but not enabled in config.
  308. -spec list() -> [plugin_info()].
  309. list() ->
  310. Pattern = filename:join([install_dir(), "*", "release.json"]),
  311. All = lists:filtermap(
  312. fun(JsonFilePath) ->
  313. [_, NameVsn | _] = lists:reverse(filename:split(JsonFilePath)),
  314. case read_plugin_info(NameVsn, #{}) of
  315. {ok, Info} ->
  316. {true, Info};
  317. {error, Reason} ->
  318. ?SLOG(warning, Reason),
  319. false
  320. end
  321. end,
  322. filelib:wildcard(Pattern)
  323. ),
  324. do_list(configured(), All).
  325. %%--------------------------------------------------------------------
  326. %% Package utils
  327. -spec decode_plugin_config_map(name_vsn(), map() | binary()) ->
  328. {ok, map() | ?plugin_without_config_schema}
  329. | {error, any()}.
  330. decode_plugin_config_map(NameVsn, AvroJsonMap) ->
  331. case with_plugin_avsc(NameVsn) of
  332. true ->
  333. case emqx_plugins_serde:lookup_serde(NameVsn) of
  334. {error, not_found} ->
  335. Reason = "plugin_config_schema_serde_not_found",
  336. ?SLOG(error, #{
  337. msg => Reason, name_vsn => NameVsn, plugin_with_avro_schema => true
  338. }),
  339. {error, Reason};
  340. {ok, _Serde} ->
  341. do_decode_plugin_config_map(NameVsn, AvroJsonMap)
  342. end;
  343. false ->
  344. ?SLOG(debug, #{name_vsn => NameVsn, plugin_with_avro_schema => false}),
  345. {ok, ?plugin_without_config_schema}
  346. end.
  347. do_decode_plugin_config_map(NameVsn, AvroJsonMap) when is_map(AvroJsonMap) ->
  348. do_decode_plugin_config_map(NameVsn, emqx_utils_json:encode(AvroJsonMap));
  349. do_decode_plugin_config_map(NameVsn, AvroJsonBin) ->
  350. case emqx_plugins_serde:decode(NameVsn, AvroJsonBin) of
  351. {ok, Config} -> {ok, Config};
  352. {error, ReasonMap} -> {error, ReasonMap}
  353. end.
  354. -spec with_plugin_avsc(name_vsn()) -> boolean().
  355. with_plugin_avsc(NameVsn) ->
  356. case read_plugin_info(NameVsn, #{fill_readme => false}) of
  357. {ok, #{<<"with_config_schema">> := WithAvsc}} when is_boolean(WithAvsc) ->
  358. WithAvsc;
  359. _ ->
  360. false
  361. end.
  362. get_config_interal(Key, Default) when is_atom(Key) ->
  363. get_config_interal([Key], Default);
  364. get_config_interal(Path, Default) ->
  365. emqx_conf:get([?CONF_ROOT | Path], Default).
  366. put_config_internal(Key, Value) ->
  367. do_put_config_internal(Key, Value, _ConfLocation = local).
  368. -spec get_tar(name_vsn()) -> {ok, binary()} | {error, any}.
  369. get_tar(NameVsn) ->
  370. TarGz = pkg_file_path(NameVsn),
  371. case file:read_file(TarGz) of
  372. {ok, Content} ->
  373. {ok, Content};
  374. {error, _} ->
  375. case maybe_create_tar(NameVsn, TarGz, install_dir()) of
  376. ok ->
  377. file:read_file(TarGz);
  378. Err ->
  379. Err
  380. end
  381. end.
  382. %%--------------------------------------------------------------------
  383. %% Internal
  384. %%--------------------------------------------------------------------
  385. maybe_create_tar(NameVsn, TarGzName, InstallDir) when is_binary(InstallDir) ->
  386. maybe_create_tar(NameVsn, TarGzName, binary_to_list(InstallDir));
  387. maybe_create_tar(NameVsn, TarGzName, InstallDir) ->
  388. case filelib:wildcard(filename:join(plugin_dir(NameVsn), "**")) of
  389. [_ | _] = PluginFiles ->
  390. InstallDir1 = string:trim(InstallDir, trailing, "/") ++ "/",
  391. PluginFiles1 = [{string:prefix(F, InstallDir1), F} || F <- PluginFiles],
  392. erl_tar:create(TarGzName, PluginFiles1, [compressed]);
  393. _ ->
  394. {error, plugin_not_found}
  395. end.
  396. write_tar_file_content(BaseDir, TarContent) ->
  397. lists:foreach(
  398. fun({Name, Bin}) ->
  399. Filename = filename:join(BaseDir, Name),
  400. ok = filelib:ensure_dir(Filename),
  401. ok = file:write_file(Filename, Bin)
  402. end,
  403. TarContent
  404. ).
  405. delete_tar_file_content(BaseDir, TarContent) ->
  406. lists:foreach(
  407. fun({Name, _}) ->
  408. Filename = filename:join(BaseDir, Name),
  409. case filelib:is_file(Filename) of
  410. true ->
  411. TopDirOrFile = top_dir(BaseDir, Filename),
  412. ok = file:del_dir_r(TopDirOrFile);
  413. false ->
  414. %% probably already deleted
  415. ok
  416. end
  417. end,
  418. TarContent
  419. ).
  420. top_dir(BaseDir0, DirOrFile) ->
  421. BaseDir = normalize_dir(BaseDir0),
  422. case filename:dirname(DirOrFile) of
  423. RockBottom when RockBottom =:= "/" orelse RockBottom =:= "." ->
  424. throw({out_of_bounds, DirOrFile});
  425. BaseDir ->
  426. DirOrFile;
  427. Parent ->
  428. top_dir(BaseDir, Parent)
  429. end.
  430. normalize_dir(Dir) ->
  431. %% Get rid of possible trailing slash
  432. filename:join([Dir, ""]).
  433. -ifdef(TEST).
  434. normalize_dir_test_() ->
  435. [
  436. ?_assertEqual("foo", normalize_dir("foo")),
  437. ?_assertEqual("foo", normalize_dir("foo/")),
  438. ?_assertEqual("/foo", normalize_dir("/foo")),
  439. ?_assertEqual("/foo", normalize_dir("/foo/"))
  440. ].
  441. top_dir_test_() ->
  442. [
  443. ?_assertEqual("base/foo", top_dir("base", filename:join(["base", "foo", "bar"]))),
  444. ?_assertEqual("/base/foo", top_dir("/base", filename:join(["/", "base", "foo", "bar"]))),
  445. ?_assertEqual("/base/foo", top_dir("/base/", filename:join(["/", "base", "foo", "bar"]))),
  446. ?_assertThrow({out_of_bounds, _}, top_dir("/base", filename:join(["/", "base"]))),
  447. ?_assertThrow({out_of_bounds, _}, top_dir("/base", filename:join(["/", "foo", "bar"])))
  448. ].
  449. -endif.
  450. do_ensure_installed(NameVsn) ->
  451. TarGz = pkg_file_path(NameVsn),
  452. case erl_tar:extract(TarGz, [compressed, memory]) of
  453. {ok, TarContent} ->
  454. ok = write_tar_file_content(install_dir(), TarContent),
  455. case read_plugin_info(NameVsn, #{}) of
  456. {ok, _} ->
  457. ok;
  458. {error, Reason} ->
  459. ?SLOG(warning, Reason#{msg => "failed_to_read_after_install"}),
  460. ok = delete_tar_file_content(install_dir(), TarContent),
  461. {error, Reason}
  462. end;
  463. {error, {_, enoent}} ->
  464. {error, #{
  465. error_msg => "failed_to_extract_plugin_package",
  466. path => TarGz,
  467. reason => not_found
  468. }};
  469. {error, Reason} ->
  470. {error, #{
  471. error_msg => "bad_plugin_package",
  472. path => TarGz,
  473. reason => Reason
  474. }}
  475. end.
  476. ensure_delete(NameVsn0) ->
  477. NameVsn = bin(NameVsn0),
  478. List = configured(),
  479. put_configured(lists:filter(fun(#{name_vsn := N1}) -> bin(N1) =/= NameVsn end, List)),
  480. ok.
  481. ensure_state(NameVsn, Position, State, ConfLocation) when is_binary(NameVsn) ->
  482. ensure_state(binary_to_list(NameVsn), Position, State, ConfLocation);
  483. ensure_state(NameVsn, Position, State, ConfLocation) ->
  484. case read_plugin_info(NameVsn, #{}) of
  485. {ok, _} ->
  486. Item = #{
  487. name_vsn => NameVsn,
  488. enable => State
  489. },
  490. tryit(
  491. "ensure_state",
  492. fun() -> ensure_configured(Item, Position, ConfLocation) end
  493. );
  494. {error, Reason} ->
  495. ?SLOG(error, #{msg => "ensure_plugin_states_failed", reason => Reason}),
  496. {error, Reason}
  497. end.
  498. ensure_configured(#{name_vsn := NameVsn} = Item, Position, ConfLocation) ->
  499. Configured = configured(),
  500. SplitFun = fun(#{name_vsn := Nv}) -> bin(Nv) =/= bin(NameVsn) end,
  501. {Front, Rear} = lists:splitwith(SplitFun, Configured),
  502. NewConfigured =
  503. case Rear of
  504. [_ | More] when Position =:= no_move ->
  505. Front ++ [Item | More];
  506. [_ | More] ->
  507. add_new_configured(Front ++ More, Position, Item);
  508. [] ->
  509. add_new_configured(Configured, Position, Item)
  510. end,
  511. ok = put_configured(NewConfigured, ConfLocation).
  512. add_new_configured(Configured, no_move, Item) ->
  513. %% default to rear
  514. add_new_configured(Configured, rear, Item);
  515. add_new_configured(Configured, front, Item) ->
  516. [Item | Configured];
  517. add_new_configured(Configured, rear, Item) ->
  518. Configured ++ [Item];
  519. add_new_configured(Configured, {Action, NameVsn}, Item) ->
  520. SplitFun = fun(#{name_vsn := Nv}) -> bin(Nv) =/= bin(NameVsn) end,
  521. {Front, Rear} = lists:splitwith(SplitFun, Configured),
  522. Rear =:= [] andalso
  523. throw(#{
  524. error_msg => "position_anchor_plugin_not_configured",
  525. hint => "maybe_install_and_configure",
  526. name_vsn => NameVsn
  527. }),
  528. case Action of
  529. before ->
  530. Front ++ [Item | Rear];
  531. behind ->
  532. [Anchor | Rear0] = Rear,
  533. Front ++ [Anchor, Item | Rear0]
  534. end.
  535. maybe_purge_plugin_config(NameVsn) ->
  536. _ = persistent_term:erase(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn)),
  537. ok.
  538. purge_plugin(NameVsn) ->
  539. Dir = plugin_dir(NameVsn),
  540. purge_plugin_dir(Dir).
  541. purge_plugin_dir(Dir) ->
  542. case file:del_dir_r(Dir) of
  543. ok ->
  544. ?SLOG(info, #{
  545. msg => "purged_plugin_dir",
  546. dir => Dir
  547. });
  548. {error, enoent} ->
  549. ok;
  550. {error, Reason} ->
  551. ?SLOG(error, #{
  552. msg => "failed_to_purge_plugin_dir",
  553. dir => Dir,
  554. reason => Reason
  555. }),
  556. {error, Reason}
  557. end.
  558. %% Make sure configured ones are ordered in front.
  559. do_list([], All) ->
  560. All;
  561. do_list([#{name_vsn := NameVsn} | Rest], All) ->
  562. SplitF = fun(#{<<"name">> := Name, <<"rel_vsn">> := Vsn}) ->
  563. bin([Name, "-", Vsn]) =/= bin(NameVsn)
  564. end,
  565. case lists:splitwith(SplitF, All) of
  566. {_, []} ->
  567. ?SLOG(warning, #{
  568. msg => "configured_plugin_not_installed",
  569. name_vsn => NameVsn
  570. }),
  571. do_list(Rest, All);
  572. {Front, [I | Rear]} ->
  573. [I | do_list(Rest, Front ++ Rear)]
  574. end.
  575. do_ensure_started(NameVsn) ->
  576. tryit(
  577. "start_plugins",
  578. fun() ->
  579. case ensure_exists_and_installed(NameVsn) of
  580. ok ->
  581. Plugin = do_read_plugin(NameVsn),
  582. ok = load_code_start_apps(NameVsn, Plugin);
  583. {error, plugin_not_found} ->
  584. ?SLOG(error, #{
  585. error_msg => "plugin_not_found",
  586. name_vsn => NameVsn
  587. }),
  588. ok
  589. end
  590. end
  591. ).
  592. %%--------------------------------------------------------------------
  593. %% try the function, catch 'throw' exceptions as normal 'error' return
  594. %% other exceptions with stacktrace logged.
  595. tryit(WhichOp, F) ->
  596. try
  597. F()
  598. catch
  599. throw:ReasonMap ->
  600. %% thrown exceptions are known errors
  601. %% translate to a return value without stacktrace
  602. {error, ReasonMap};
  603. error:Reason:Stacktrace ->
  604. %% unexpected errors, log stacktrace
  605. ?SLOG(warning, #{
  606. msg => "plugin_op_failed",
  607. which_op => WhichOp,
  608. exception => Reason,
  609. stacktrace => Stacktrace
  610. }),
  611. {error, #{
  612. which_op => WhichOp,
  613. exception => Reason,
  614. stacktrace => Stacktrace
  615. }}
  616. end.
  617. %% read plugin info from the JSON file
  618. %% returns {ok, Info} or {error, Reason}
  619. read_plugin_info(NameVsn, Options) ->
  620. tryit(
  621. atom_to_list(?FUNCTION_NAME),
  622. fun() -> {ok, do_read_plugin(NameVsn, Options)} end
  623. ).
  624. do_read_plugin(NameVsn) ->
  625. do_read_plugin(NameVsn, #{}).
  626. do_read_plugin(NameVsn, Option) ->
  627. do_read_plugin(NameVsn, info_file_path(NameVsn), Option).
  628. do_read_plugin(NameVsn, InfoFilePath, Options) ->
  629. {ok, PlainMap} = (read_file_fun(InfoFilePath, "bad_info_file", #{read_mode => ?JSON_MAP}))(),
  630. Info0 = check_plugin(PlainMap, NameVsn, InfoFilePath),
  631. Info1 = plugins_readme(NameVsn, Options, Info0),
  632. plugin_status(NameVsn, Info1).
  633. read_plugin_avsc(NameVsn) ->
  634. read_plugin_avsc(NameVsn, #{read_mode => ?JSON_MAP}).
  635. read_plugin_avsc(NameVsn, Options) ->
  636. tryit(
  637. atom_to_list(?FUNCTION_NAME),
  638. read_file_fun(avsc_file_path(NameVsn), "bad_avsc_file", Options)
  639. ).
  640. read_plugin_i18n(NameVsn) ->
  641. read_plugin_i18n(NameVsn, #{read_mode => ?JSON_MAP}).
  642. read_plugin_i18n(NameVsn, Options) ->
  643. tryit(
  644. atom_to_list(?FUNCTION_NAME),
  645. read_file_fun(i18n_file_path(NameVsn), "bad_i18n_file", Options)
  646. ).
  647. read_plugin_hocon(NameVsn) ->
  648. read_plugin_hocon(NameVsn, #{read_mode => ?RAW_BIN}).
  649. read_plugin_hocon(NameVsn, Options) ->
  650. tryit(
  651. atom_to_list(?FUNCTION_NAME),
  652. read_file_fun(plugin_config_file(NameVsn), "bad_hocon_file", Options)
  653. ).
  654. ensure_exists_and_installed(NameVsn) ->
  655. case filelib:is_dir(plugin_dir(NameVsn)) of
  656. true ->
  657. ok;
  658. false ->
  659. %% Do we have the package, but it's not extracted yet?
  660. case get_tar(NameVsn) of
  661. {ok, TarContent} ->
  662. ok = file:write_file(pkg_file_path(NameVsn), TarContent),
  663. do_ensure_installed(NameVsn);
  664. _ ->
  665. %% If not, try to get it from the cluster.
  666. do_get_from_cluster(NameVsn)
  667. end
  668. end.
  669. do_get_from_cluster(NameVsn) ->
  670. Nodes = [N || N <- mria:running_nodes(), N /= node()],
  671. case get_plugin_tar_from_any_node(Nodes, NameVsn, []) of
  672. {ok, TarContent} ->
  673. ok = file:write_file(pkg_file_path(NameVsn), TarContent),
  674. ok = do_ensure_installed(NameVsn);
  675. {error, NodeErrors} when Nodes =/= [] ->
  676. ErrMeta = #{
  677. error_msg => "failed_to_copy_plugin_from_other_nodes",
  678. name_vsn => NameVsn,
  679. node_errors => NodeErrors,
  680. reason => not_found
  681. },
  682. ?SLOG(error, ErrMeta),
  683. {error, ErrMeta};
  684. {error, _} ->
  685. ErrMeta = #{
  686. error_msg => "no_nodes_to_copy_plugin_from",
  687. name_vsn => NameVsn,
  688. reason => not_found
  689. },
  690. ?SLOG(error, ErrMeta),
  691. {error, ErrMeta}
  692. end.
  693. get_plugin_tar_from_any_node([], _NameVsn, Errors) ->
  694. {error, Errors};
  695. get_plugin_tar_from_any_node([Node | T], NameVsn, Errors) ->
  696. case emqx_plugins_proto_v1:get_tar(Node, NameVsn, infinity) of
  697. {ok, _} = Res ->
  698. Res;
  699. Err ->
  700. get_plugin_tar_from_any_node(T, NameVsn, [{Node, Err} | Errors])
  701. end.
  702. get_plugin_config_from_any_node([], _NameVsn, Errors) ->
  703. {error, Errors};
  704. get_plugin_config_from_any_node([Node | T], NameVsn, Errors) ->
  705. case
  706. emqx_plugins_proto_v2:get_config(
  707. Node, NameVsn, ?CONFIG_FORMAT_MAP, ?plugin_conf_not_found, 5_000
  708. )
  709. of
  710. {ok, _} = Res ->
  711. Res;
  712. Err ->
  713. get_plugin_config_from_any_node(T, NameVsn, [{Node, Err} | Errors])
  714. end.
  715. plugins_readme(NameVsn, #{fill_readme := true}, Info) ->
  716. case file:read_file(readme_file(NameVsn)) of
  717. {ok, Bin} -> Info#{readme => Bin};
  718. _ -> Info#{readme => <<>>}
  719. end;
  720. plugins_readme(_NameVsn, _Options, Info) ->
  721. Info.
  722. plugin_status(NameVsn, Info) ->
  723. {ok, AppName, _AppVsn} = parse_name_vsn(NameVsn),
  724. RunningSt =
  725. case application:get_key(AppName, vsn) of
  726. {ok, _} ->
  727. case lists:keyfind(AppName, 1, running_apps()) of
  728. {AppName, _} -> running;
  729. _ -> loaded
  730. end;
  731. undefined ->
  732. stopped
  733. end,
  734. Configured = lists:filtermap(
  735. fun(#{name_vsn := Nv, enable := St}) ->
  736. case bin(Nv) =:= bin(NameVsn) of
  737. true -> {true, St};
  738. false -> false
  739. end
  740. end,
  741. configured()
  742. ),
  743. ConfSt =
  744. case Configured of
  745. [] -> not_configured;
  746. [true] -> enabled;
  747. [false] -> disabled
  748. end,
  749. Info#{
  750. running_status => RunningSt,
  751. config_status => ConfSt
  752. }.
  753. check_plugin(
  754. #{
  755. <<"name">> := Name,
  756. <<"rel_vsn">> := Vsn,
  757. <<"rel_apps">> := Apps,
  758. <<"description">> := _
  759. } = Info,
  760. NameVsn,
  761. FilePath
  762. ) ->
  763. case bin(NameVsn) =:= bin([Name, "-", Vsn]) of
  764. true ->
  765. try
  766. %% assert
  767. [_ | _] = Apps,
  768. %% validate if the list is all <app>-<vsn> strings
  769. lists:foreach(fun(App) -> {ok, _, _} = parse_name_vsn(App) end, Apps)
  770. catch
  771. _:_ ->
  772. throw(#{
  773. error_msg => "bad_rel_apps",
  774. rel_apps => Apps,
  775. hint => "A non-empty string list of app_name-app_vsn format"
  776. })
  777. end,
  778. Info;
  779. false ->
  780. throw(#{
  781. error_msg => "name_vsn_mismatch",
  782. name_vsn => NameVsn,
  783. path => FilePath,
  784. name => Name,
  785. rel_vsn => Vsn
  786. })
  787. end;
  788. check_plugin(_What, NameVsn, File) ->
  789. throw(#{
  790. error_msg => "bad_info_file_content",
  791. mandatory_fields => [rel_vsn, name, rel_apps, description],
  792. name_vsn => NameVsn,
  793. path => File
  794. }).
  795. load_code_start_apps(RelNameVsn, #{<<"rel_apps">> := Apps}) ->
  796. LibDir = filename:join([install_dir(), RelNameVsn]),
  797. RunningApps = running_apps(),
  798. %% load plugin apps and beam code
  799. AppNames =
  800. lists:map(
  801. fun(AppNameVsn) ->
  802. {ok, AppName, AppVsn} = parse_name_vsn(AppNameVsn),
  803. EbinDir = filename:join([LibDir, AppNameVsn, "ebin"]),
  804. ok = load_plugin_app(AppName, AppVsn, EbinDir, RunningApps),
  805. AppName
  806. end,
  807. Apps
  808. ),
  809. lists:foreach(fun start_app/1, AppNames).
  810. load_plugin_app(AppName, AppVsn, Ebin, RunningApps) ->
  811. case lists:keyfind(AppName, 1, RunningApps) of
  812. false ->
  813. do_load_plugin_app(AppName, Ebin);
  814. {_, Vsn} ->
  815. case bin(Vsn) =:= bin(AppVsn) of
  816. true ->
  817. %% already started on the exact version
  818. ok;
  819. false ->
  820. %% running but a different version
  821. ?SLOG(warning, #{
  822. msg => "plugin_app_already_running",
  823. name => AppName,
  824. running_vsn => Vsn,
  825. loading_vsn => AppVsn
  826. })
  827. end
  828. end.
  829. do_load_plugin_app(AppName, Ebin) when is_binary(Ebin) ->
  830. do_load_plugin_app(AppName, binary_to_list(Ebin));
  831. do_load_plugin_app(AppName, Ebin) ->
  832. _ = code:add_patha(Ebin),
  833. Modules = filelib:wildcard(filename:join([Ebin, "*.beam"])),
  834. lists:foreach(
  835. fun(BeamFile) ->
  836. Module = list_to_atom(filename:basename(BeamFile, ".beam")),
  837. _ = code:purge(Module),
  838. case code:load_file(Module) of
  839. {module, _} ->
  840. ok;
  841. {error, Reason} ->
  842. throw(#{
  843. error_msg => "failed_to_load_plugin_beam",
  844. path => BeamFile,
  845. reason => Reason
  846. })
  847. end
  848. end,
  849. Modules
  850. ),
  851. case application:load(AppName) of
  852. ok ->
  853. ok;
  854. {error, {already_loaded, _}} ->
  855. ok;
  856. {error, Reason} ->
  857. throw(#{
  858. error_msg => "failed_to_load_plugin_app",
  859. name => AppName,
  860. reason => Reason
  861. })
  862. end.
  863. start_app(App) ->
  864. case application:ensure_all_started(App) of
  865. {ok, Started} ->
  866. case Started =/= [] of
  867. true -> ?SLOG(debug, #{msg => "started_plugin_apps", apps => Started});
  868. false -> ok
  869. end,
  870. ?SLOG(debug, #{msg => "started_plugin_app", app => App}),
  871. ok;
  872. {error, {ErrApp, Reason}} ->
  873. throw(#{
  874. error_msg => "failed_to_start_plugin_app",
  875. app => App,
  876. err_app => ErrApp,
  877. reason => Reason
  878. })
  879. end.
  880. %% Stop all apps installed by the plugin package,
  881. %% but not the ones shared with others.
  882. ensure_apps_stopped(#{<<"rel_apps">> := Apps}) ->
  883. %% load plugin apps and beam code
  884. AppsToStop =
  885. lists:map(
  886. fun(NameVsn) ->
  887. {ok, AppName, _AppVsn} = parse_name_vsn(NameVsn),
  888. AppName
  889. end,
  890. Apps
  891. ),
  892. case tryit("stop_apps", fun() -> stop_apps(AppsToStop) end) of
  893. {ok, []} ->
  894. %% all apps stopped
  895. ok;
  896. {ok, Left} ->
  897. ?SLOG(warning, #{
  898. msg => "unabled_to_stop_plugin_apps",
  899. apps => Left,
  900. reason => "running_apps_still_depends_on_this_apps"
  901. }),
  902. ok;
  903. {error, Reason} ->
  904. {error, Reason}
  905. end.
  906. stop_apps(Apps) ->
  907. RunningApps = running_apps(),
  908. case do_stop_apps(Apps, [], RunningApps) of
  909. %% all stopped
  910. {ok, []} -> {ok, []};
  911. %% no progress
  912. {ok, Remain} when Remain =:= Apps -> {ok, Apps};
  913. %% try again
  914. {ok, Remain} -> stop_apps(Remain)
  915. end.
  916. do_stop_apps([], Remain, _AllApps) ->
  917. {ok, lists:reverse(Remain)};
  918. do_stop_apps([App | Apps], Remain, RunningApps) ->
  919. case is_needed_by_any(App, RunningApps) of
  920. true ->
  921. do_stop_apps(Apps, [App | Remain], RunningApps);
  922. false ->
  923. ok = stop_app(App),
  924. do_stop_apps(Apps, Remain, RunningApps)
  925. end.
  926. stop_app(App) ->
  927. case application:stop(App) of
  928. ok ->
  929. ?SLOG(debug, #{msg => "stop_plugin_successfully", app => App}),
  930. ok = unload_moudle_and_app(App);
  931. {error, {not_started, App}} ->
  932. ?SLOG(debug, #{msg => "plugin_not_started", app => App}),
  933. ok = unload_moudle_and_app(App);
  934. {error, Reason} ->
  935. throw(#{error_msg => "failed_to_stop_app", app => App, reason => Reason})
  936. end.
  937. unload_moudle_and_app(App) ->
  938. case application:get_key(App, modules) of
  939. {ok, Modules} -> lists:foreach(fun code:soft_purge/1, Modules);
  940. _ -> ok
  941. end,
  942. _ = application:unload(App),
  943. ok.
  944. is_needed_by_any(AppToStop, RunningApps) ->
  945. lists:any(
  946. fun({RunningApp, _RunningAppVsn}) ->
  947. is_needed_by(AppToStop, RunningApp)
  948. end,
  949. RunningApps
  950. ).
  951. is_needed_by(AppToStop, AppToStop) ->
  952. false;
  953. is_needed_by(AppToStop, RunningApp) ->
  954. case application:get_key(RunningApp, applications) of
  955. {ok, Deps} -> lists:member(AppToStop, Deps);
  956. undefined -> false
  957. end.
  958. do_put_config_internal(Key, Value, ConfLocation) when is_atom(Key) ->
  959. do_put_config_internal([Key], Value, ConfLocation);
  960. do_put_config_internal(Path, Values, _ConfLocation = local) when is_list(Path) ->
  961. Opts = #{rawconf_with_defaults => true, override_to => cluster},
  962. %% Already in cluster_rpc, don't use emqx_conf:update, dead calls
  963. case emqx:update_config([?CONF_ROOT | Path], bin_key(Values), Opts) of
  964. {ok, _} -> ok;
  965. Error -> Error
  966. end;
  967. do_put_config_internal(Path, Values, _ConfLocation = global) when is_list(Path) ->
  968. Opts = #{rawconf_with_defaults => true, override_to => cluster},
  969. case emqx_conf:update([?CONF_ROOT | Path], bin_key(Values), Opts) of
  970. {ok, _} -> ok;
  971. Error -> Error
  972. end.
  973. %%--------------------------------------------------------------------
  974. %% `emqx_config_handler' API
  975. %%--------------------------------------------------------------------
  976. post_config_update([?CONF_ROOT], _Req, #{states := NewStates}, #{states := OldStates}, _Envs) ->
  977. NewStatesIndex = maps:from_list([{NV, S} || S = #{name_vsn := NV} <- NewStates]),
  978. OldStatesIndex = maps:from_list([{NV, S} || S = #{name_vsn := NV} <- OldStates]),
  979. #{changed := Changed} = emqx_utils_maps:diff_maps(NewStatesIndex, OldStatesIndex),
  980. maps:foreach(fun enable_disable_plugin/2, Changed),
  981. ok;
  982. post_config_update(_Path, _Req, _NewConf, _OldConf, _Envs) ->
  983. ok.
  984. enable_disable_plugin(NameVsn, {#{enable := true}, #{enable := false}}) ->
  985. %% errors are already logged in this fn
  986. _ = ensure_stopped(NameVsn),
  987. ok;
  988. enable_disable_plugin(NameVsn, {#{enable := false}, #{enable := true}}) ->
  989. %% errors are already logged in this fn
  990. _ = ensure_started(NameVsn),
  991. ok;
  992. enable_disable_plugin(_NameVsn, _Diff) ->
  993. ok.
  994. %%--------------------------------------------------------------------
  995. %% Helper functions
  996. %%--------------------------------------------------------------------
  997. install_dir() ->
  998. get_config_interal(install_dir, "").
  999. put_configured(Configured) ->
  1000. put_configured(Configured, _ConfLocation = local).
  1001. put_configured(Configured, ConfLocation) ->
  1002. ok = do_put_config_internal(states, bin_key(Configured), ConfLocation).
  1003. configured() ->
  1004. get_config_interal(states, []).
  1005. for_plugins(ActionFun) ->
  1006. case lists:flatmap(ActionFun, configured()) of
  1007. [] ->
  1008. ok;
  1009. Errors ->
  1010. ErrMeta = #{function => ActionFun, errors => Errors},
  1011. ?tp(
  1012. for_plugins_action_error_occurred,
  1013. ErrMeta
  1014. ),
  1015. ?SLOG(error, ErrMeta),
  1016. ok
  1017. end.
  1018. maybe_post_op_after_installed(NameVsn0) ->
  1019. NameVsn = wrap_to_list(NameVsn0),
  1020. _ = maybe_load_config_schema(NameVsn),
  1021. ok = maybe_ensure_state(NameVsn).
  1022. maybe_ensure_state(NameVsn) ->
  1023. EnsureStateFun = fun(#{name_vsn := NV, enable := Bool}, AccIn) ->
  1024. case NV of
  1025. NameVsn ->
  1026. %% Configured, using existed cluster config
  1027. _ = ensure_state(NV, no_move, Bool, global),
  1028. AccIn#{ensured => true};
  1029. _ ->
  1030. AccIn
  1031. end
  1032. end,
  1033. case lists:foldl(EnsureStateFun, #{ensured => false}, configured()) of
  1034. #{ensured := true} ->
  1035. ok;
  1036. #{ensured := false} ->
  1037. ?SLOG(info, #{msg => "plugin_not_configured", name_vsn => NameVsn}),
  1038. %% Clean installation, no config, ensure with `Enable = false`
  1039. _ = ensure_state(NameVsn, no_move, false, global),
  1040. ok
  1041. end,
  1042. ok.
  1043. maybe_load_config_schema(NameVsn) ->
  1044. AvscPath = avsc_file_path(NameVsn),
  1045. _ =
  1046. with_plugin_avsc(NameVsn) andalso
  1047. filelib:is_regular(AvscPath) andalso
  1048. do_load_config_schema(NameVsn, AvscPath),
  1049. _ = maybe_create_config_dir(NameVsn).
  1050. do_load_config_schema(NameVsn, AvscPath) ->
  1051. case emqx_plugins_serde:add_schema(bin(NameVsn), AvscPath) of
  1052. ok -> ok;
  1053. {error, already_exists} -> ok;
  1054. {error, _Reason} -> ok
  1055. end.
  1056. maybe_create_config_dir(NameVsn) ->
  1057. with_plugin_avsc(NameVsn) andalso
  1058. do_create_config_dir(NameVsn).
  1059. do_create_config_dir(NameVsn) ->
  1060. case plugin_config_dir(NameVsn) of
  1061. {error, Reason} ->
  1062. {error, {gen_config_dir_failed, Reason}};
  1063. ConfigDir ->
  1064. case filelib:ensure_path(ConfigDir) of
  1065. ok ->
  1066. %% get config from other nodes or get from tarball
  1067. _ = maybe_ensure_plugin_config(NameVsn),
  1068. ok;
  1069. {error, Reason} ->
  1070. ?SLOG(warning, #{
  1071. msg => "failed_to_create_plugin_config_dir",
  1072. dir => ConfigDir,
  1073. reason => Reason
  1074. }),
  1075. {error, {mkdir_failed, ConfigDir, Reason}}
  1076. end
  1077. end.
  1078. -spec maybe_ensure_plugin_config(name_vsn()) -> ok.
  1079. maybe_ensure_plugin_config(NameVsn) ->
  1080. maybe
  1081. true ?= with_plugin_avsc(NameVsn),
  1082. _ = ensure_plugin_config(NameVsn)
  1083. else
  1084. _ -> ok
  1085. end.
  1086. -spec ensure_plugin_config(name_vsn()) -> ok.
  1087. ensure_plugin_config(NameVsn) ->
  1088. %% fetch plugin hocon config from cluster
  1089. Nodes = [N || N <- mria:running_nodes(), N /= node()],
  1090. ensure_plugin_config(NameVsn, Nodes).
  1091. -spec ensure_plugin_config(name_vsn(), list()) -> ok.
  1092. ensure_plugin_config(NameVsn, []) ->
  1093. ?SLOG(debug, #{
  1094. msg => "default_plugin_config_used",
  1095. name_vsn => NameVsn,
  1096. reason => "no_other_running_nodes"
  1097. }),
  1098. cp_default_config_file(NameVsn);
  1099. ensure_plugin_config(NameVsn, Nodes) ->
  1100. case get_plugin_config_from_any_node(Nodes, NameVsn, []) of
  1101. {ok, ConfigMap} when is_map(ConfigMap) ->
  1102. HoconBin = hocon_pp:do(ConfigMap, #{}),
  1103. Path = plugin_config_file(NameVsn),
  1104. ok = filelib:ensure_dir(Path),
  1105. ok = file:write_file(Path, HoconBin),
  1106. ensure_config_map(NameVsn);
  1107. _ ->
  1108. ?SLOG(error, #{msg => "config_not_found_from_cluster", name_vsn => NameVsn}),
  1109. %% otherwise cp default hocon file
  1110. %% i.e. Clean installation
  1111. cp_default_config_file(NameVsn)
  1112. end.
  1113. -spec cp_default_config_file(name_vsn()) -> ok.
  1114. cp_default_config_file(NameVsn) ->
  1115. %% always copy default hocon file into config dir when can not get config from other nodes
  1116. Source = default_plugin_config_file(NameVsn),
  1117. Destination = plugin_config_file(NameVsn),
  1118. maybe
  1119. true ?= filelib:is_regular(Source),
  1120. %% destination path not existed (not configured)
  1121. true ?= (not filelib:is_regular(Destination)),
  1122. ok = filelib:ensure_dir(Destination),
  1123. case file:copy(Source, Destination) of
  1124. {ok, _} ->
  1125. ensure_config_map(NameVsn);
  1126. {error, Reason} ->
  1127. ?SLOG(warning, #{
  1128. msg => "failed_to_copy_plugin_default_hocon_config",
  1129. source => Source,
  1130. destination => Destination,
  1131. reason => Reason
  1132. })
  1133. end
  1134. else
  1135. _ -> ensure_config_map(NameVsn)
  1136. end.
  1137. ensure_config_map(NameVsn) ->
  1138. case read_plugin_hocon(NameVsn, #{read_mode => ?JSON_MAP}) of
  1139. {ok, ConfigJsonMap} ->
  1140. case with_plugin_avsc(NameVsn) of
  1141. true ->
  1142. do_ensure_config_map(NameVsn, ConfigJsonMap);
  1143. false ->
  1144. put_config(NameVsn, ConfigJsonMap, ?plugin_without_config_schema)
  1145. end;
  1146. _ ->
  1147. ?SLOG(warning, #{msg => "failed_to_read_plugin_config_hocon", name_vsn => NameVsn}),
  1148. ok
  1149. end.
  1150. do_ensure_config_map(NameVsn, ConfigJsonMap) ->
  1151. case decode_plugin_config_map(NameVsn, ConfigJsonMap) of
  1152. {ok, AvroValue} ->
  1153. put_config(NameVsn, ConfigJsonMap, AvroValue);
  1154. {error, Reason} ->
  1155. ?SLOG(error, #{
  1156. msg => "plugin_config_validation_failed",
  1157. name_vsn => NameVsn,
  1158. reason => Reason
  1159. }),
  1160. ok
  1161. end.
  1162. %% @private Backup the current config to a file with a timestamp suffix and
  1163. %% then save the new config to the config file.
  1164. backup_and_write_hocon_bin(NameVsn, HoconBin) ->
  1165. %% this may fail, but we don't care
  1166. %% e.g. read-only file system
  1167. Path = plugin_config_file(NameVsn),
  1168. _ = filelib:ensure_dir(Path),
  1169. TmpFile = Path ++ ".tmp",
  1170. case file:write_file(TmpFile, HoconBin) of
  1171. ok ->
  1172. backup_and_replace(Path, TmpFile);
  1173. {error, Reason} ->
  1174. ?SLOG(error, #{
  1175. msg => "failed_to_save_plugin_conf_file",
  1176. hint =>
  1177. "The updated cluster config is not saved on this node, please check the file system.",
  1178. filename => TmpFile,
  1179. reason => Reason
  1180. }),
  1181. %% e.g. read-only, it's not the end of the world
  1182. ok
  1183. end.
  1184. backup_and_replace(Path, TmpPath) ->
  1185. Backup = Path ++ "." ++ now_time() ++ ".bak",
  1186. case file:rename(Path, Backup) of
  1187. ok ->
  1188. ok = file:rename(TmpPath, Path),
  1189. ok = prune_backup_files(Path);
  1190. {error, enoent} ->
  1191. %% not created yet
  1192. ok = file:rename(TmpPath, Path);
  1193. {error, Reason} ->
  1194. ?SLOG(warning, #{
  1195. msg => "failed_to_backup_plugin_conf_file",
  1196. filename => Backup,
  1197. reason => Reason
  1198. }),
  1199. ok
  1200. end.
  1201. prune_backup_files(Path) ->
  1202. Files0 = filelib:wildcard(Path ++ ".*"),
  1203. Re = "\\.[0-9]{4}\\.[0-9]{2}\\.[0-9]{2}\\.[0-9]{2}\\.[0-9]{2}\\.[0-9]{2}\\.[0-9]{3}\\.bak$",
  1204. Files = lists:filter(fun(F) -> re:run(F, Re) =/= nomatch end, Files0),
  1205. Sorted = lists:reverse(lists:sort(Files)),
  1206. {_Keeps, Deletes} = lists:split(min(?MAX_KEEP_BACKUP_CONFIGS, length(Sorted)), Sorted),
  1207. lists:foreach(
  1208. fun(F) ->
  1209. case file:delete(F) of
  1210. ok ->
  1211. ok;
  1212. {error, Reason} ->
  1213. ?SLOG(warning, #{
  1214. msg => "failed_to_delete_backup_plugin_conf_file",
  1215. filename => F,
  1216. reason => Reason
  1217. }),
  1218. ok
  1219. end
  1220. end,
  1221. Deletes
  1222. ).
  1223. read_file_fun(Path, ErrMsg, #{read_mode := ?RAW_BIN}) ->
  1224. fun() ->
  1225. case file:read_file(Path) of
  1226. {ok, Bin} ->
  1227. {ok, Bin};
  1228. {error, Reason} ->
  1229. ErrMeta = #{error_msg => ErrMsg, reason => Reason},
  1230. throw(ErrMeta)
  1231. end
  1232. end;
  1233. read_file_fun(Path, ErrMsg, #{read_mode := ?JSON_MAP}) ->
  1234. fun() ->
  1235. case hocon:load(Path, #{format => richmap}) of
  1236. {ok, RichMap} ->
  1237. {ok, hocon_maps:ensure_plain(RichMap)};
  1238. {error, Reason} ->
  1239. ErrMeta = #{error_msg => ErrMsg, reason => Reason},
  1240. throw(ErrMeta)
  1241. end
  1242. end.
  1243. %% Directorys
  1244. -spec plugin_dir(name_vsn()) -> string().
  1245. plugin_dir(NameVsn) ->
  1246. wrap_to_list(filename:join([install_dir(), NameVsn])).
  1247. -spec plugin_priv_dir(name_vsn()) -> string().
  1248. plugin_priv_dir(NameVsn) ->
  1249. case read_plugin_info(NameVsn, #{fill_readme => false}) of
  1250. {ok, #{<<"name">> := Name, <<"metadata_vsn">> := Vsn}} ->
  1251. AppDir = make_name_vsn_string(Name, Vsn),
  1252. wrap_to_list(filename:join([plugin_dir(NameVsn), AppDir, "priv"]));
  1253. _ ->
  1254. wrap_to_list(filename:join([install_dir(), NameVsn, "priv"]))
  1255. end.
  1256. -spec plugin_config_dir(name_vsn()) -> string() | {error, Reason :: string()}.
  1257. plugin_config_dir(NameVsn) ->
  1258. case parse_name_vsn(NameVsn) of
  1259. {ok, NameAtom, _Vsn} ->
  1260. wrap_to_list(filename:join([emqx:data_dir(), "plugins", atom_to_list(NameAtom)]));
  1261. {error, Reason} ->
  1262. ?SLOG(warning, #{
  1263. msg => "failed_to_generate_plugin_config_dir_for_plugin",
  1264. plugin_namevsn => NameVsn,
  1265. reason => Reason
  1266. }),
  1267. {error, Reason}
  1268. end.
  1269. %% Files
  1270. -spec pkg_file_path(name_vsn()) -> string().
  1271. pkg_file_path(NameVsn) ->
  1272. wrap_to_list(filename:join([install_dir(), bin([NameVsn, ".tar.gz"])])).
  1273. -spec info_file_path(name_vsn()) -> string().
  1274. info_file_path(NameVsn) ->
  1275. wrap_to_list(filename:join([plugin_dir(NameVsn), "release.json"])).
  1276. -spec avsc_file_path(name_vsn()) -> string().
  1277. avsc_file_path(NameVsn) ->
  1278. wrap_to_list(filename:join([plugin_priv_dir(NameVsn), "config_schema.avsc"])).
  1279. -spec plugin_config_file(name_vsn()) -> string().
  1280. plugin_config_file(NameVsn) ->
  1281. wrap_to_list(filename:join([plugin_config_dir(NameVsn), "config.hocon"])).
  1282. %% should only used when plugin installing
  1283. -spec default_plugin_config_file(name_vsn()) -> string().
  1284. default_plugin_config_file(NameVsn) ->
  1285. wrap_to_list(filename:join([plugin_priv_dir(NameVsn), "config.hocon"])).
  1286. -spec i18n_file_path(name_vsn()) -> string().
  1287. i18n_file_path(NameVsn) ->
  1288. wrap_to_list(filename:join([plugin_priv_dir(NameVsn), "config_i18n.json"])).
  1289. -spec readme_file(name_vsn()) -> string().
  1290. readme_file(NameVsn) ->
  1291. wrap_to_list(filename:join([plugin_dir(NameVsn), "README.md"])).
  1292. running_apps() ->
  1293. lists:map(
  1294. fun({N, _, V}) ->
  1295. {N, V}
  1296. end,
  1297. application:which_applications(infinity)
  1298. ).
  1299. %% @private This is the same human-readable timestamp format as
  1300. %% hocon-cli generated app.<time>.config file name.
  1301. now_time() ->
  1302. Ts = os:system_time(millisecond),
  1303. {{Y, M, D}, {HH, MM, SS}} = calendar:system_time_to_local_time(Ts, millisecond),
  1304. Res = io_lib:format(
  1305. "~0p.~2..0b.~2..0b.~2..0b.~2..0b.~2..0b.~3..0b",
  1306. [Y, M, D, HH, MM, SS, Ts rem 1000]
  1307. ),
  1308. lists:flatten(Res).
  1309. bin_key(Map) when is_map(Map) ->
  1310. maps:fold(fun(K, V, Acc) -> Acc#{bin(K) => V} end, #{}, Map);
  1311. bin_key(List = [#{} | _]) ->
  1312. lists:map(fun(M) -> bin_key(M) end, List);
  1313. bin_key(Term) ->
  1314. Term.
  1315. bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
  1316. bin(L) when is_list(L) -> unicode:characters_to_binary(L, utf8);
  1317. bin(B) when is_binary(B) -> B.
  1318. wrap_to_list(Path) ->
  1319. binary_to_list(iolist_to_binary(Path)).