emqx_plugins.erl 50 KB


  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2017-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_plugins).
  17. -feature(maybe_expr, enable).
  18. -include("emqx_plugins.hrl").
  19. -include_lib("emqx/include/logger.hrl").
  20. -include_lib("snabbkaffe/include/trace.hrl").
  21. -ifdef(TEST).
  22. -include_lib("eunit/include/eunit.hrl").
  23. -endif.
  24. -export([
  25. describe/1,
  26. plugin_schema_json/1,
  27. plugin_i18n_json/1,
  28. raw_plugin_config_content/1,
  29. parse_name_vsn/1,
  30. make_name_vsn_string/2
  31. ]).
  32. %% Package operations
  33. -export([
  34. ensure_installed/0,
  35. ensure_installed/1,
  36. ensure_installed/2,
  37. ensure_uninstalled/1,
  38. ensure_enabled/1,
  39. ensure_enabled/2,
  40. ensure_enabled/3,
  41. ensure_disabled/1,
  42. purge/1,
  43. delete_package/1
  44. ]).
  45. %% Plugin runtime management
  46. -export([
  47. ensure_started/0,
  48. ensure_started/1,
  49. ensure_stopped/0,
  50. ensure_stopped/1,
  51. restart/1,
  52. list/0
  53. ]).
  54. %% Plugin config APIs
  55. -export([
  56. get_config/1,
  57. get_config/2,
  58. get_config/3,
  59. get_config/4,
  60. put_config/3
  61. ]).
  62. %% Package utils
  63. -export([
  64. decode_plugin_config_map/2,
  65. install_dir/0,
  66. avsc_file_path/1,
  67. with_plugin_avsc/1
  68. ]).
  69. %% `emqx_config_handler' API
  70. -export([
  71. post_config_update/5
  72. ]).
  73. %% RPC call
  74. -export([get_tar/1]).
  75. %% Internal export
  76. -export([
  77. ensure_config_map/1,
  78. do_ensure_started/1
  79. ]).
  80. %% for test cases
  81. -export([put_config_internal/2]).
  82. -ifdef(TEST).
  83. -compile(export_all).
  84. -compile(nowarn_export_all).
  85. -endif.
  86. %% Defines
  87. -define(PLUGIN_PERSIS_CONFIG_KEY(NameVsn), {?MODULE, NameVsn}).
  88. -define(RAW_BIN, binary).
  89. -define(JSON_MAP, json_map).
  90. -define(MAX_KEEP_BACKUP_CONFIGS, 10).
  91. %%--------------------------------------------------------------------
  92. %% APIs
  93. %%--------------------------------------------------------------------
  94. %% @doc Describe a plugin.
  95. -spec describe(name_vsn()) -> {ok, plugin_info()} | {error, any()}.
  96. describe(NameVsn) ->
  97. read_plugin_info(NameVsn, #{fill_readme => true}).
  98. -spec plugin_schema_json(name_vsn()) -> {ok, schema_json_map()} | {error, any()}.
  99. plugin_schema_json(NameVsn) ->
  100. read_plugin_avsc(NameVsn).
  101. -spec plugin_i18n_json(name_vsn()) -> {ok, i18n_json_map()} | {error, any()}.
  102. plugin_i18n_json(NameVsn) ->
  103. read_plugin_i18n(NameVsn).
  104. -spec raw_plugin_config_content(name_vsn()) -> {ok, raw_plugin_config_content()} | {error, any()}.
  105. raw_plugin_config_content(NameVsn) ->
  106. read_plugin_hocon(NameVsn).
  107. parse_name_vsn(NameVsn) when is_binary(NameVsn) ->
  108. parse_name_vsn(binary_to_list(NameVsn));
  109. parse_name_vsn(NameVsn) when is_list(NameVsn) ->
  110. case lists:splitwith(fun(X) -> X =/= $- end, NameVsn) of
  111. {AppName, [$- | Vsn]} -> {ok, list_to_atom(AppName), Vsn};
  112. _ -> {error, "bad_name_vsn"}
  113. end.
  114. make_name_vsn_string(Name, Vsn) ->
  115. binary_to_list(iolist_to_binary([Name, "-", Vsn])).
  116. app_dir(AppName, Apps) ->
  117. case
  118. lists:filter(
  119. fun(AppNameVsn) -> nomatch =/= string:prefix(AppNameVsn, AppName) end,
  120. Apps
  121. )
  122. of
  123. [AppNameVsn] ->
  124. {ok, AppNameVsn};
  125. _ ->
  126. {error, not_found}
  127. end.
  128. %%--------------------------------------------------------------------
  129. %% Package operations
  130. %% @doc Start all configured plugins are started.
  131. -spec ensure_installed() -> ok.
  132. ensure_installed() ->
  133. Fun = fun(#{name_vsn := NameVsn}) ->
  134. case ensure_installed(NameVsn) of
  135. ok -> [];
  136. {error, Reason} -> [{NameVsn, Reason}]
  137. end
  138. end,
  139. ok = for_plugins(Fun).
  140. %% @doc Install a .tar.gz package placed in install_dir.
  141. -spec ensure_installed(name_vsn()) -> ok | {error, map()}.
  142. ensure_installed(NameVsn) ->
  143. case read_plugin_info(NameVsn, #{}) of
  144. {ok, _} ->
  145. ok,
  146. _ = maybe_ensure_plugin_config(NameVsn, ?normal);
  147. {error, _} ->
  148. ok = purge(NameVsn),
  149. case ensure_exists_and_installed(NameVsn) of
  150. ok ->
  151. maybe_post_op_after_installed(NameVsn, ?normal),
  152. ok;
  153. {error, _Reason} = Err ->
  154. Err
  155. end
  156. end.
  157. ensure_installed(NameVsn, ?fresh_install = Mode) ->
  158. case ensure_exists_and_installed(NameVsn) of
  159. ok ->
  160. maybe_post_op_after_installed(NameVsn, Mode),
  161. ok;
  162. {error, _Reason} = Err ->
  163. Err
  164. end.
  165. %% @doc Ensure files and directories for the given plugin are being deleted.
  166. %% If a plugin is running, or enabled, an error is returned.
  167. -spec ensure_uninstalled(name_vsn()) -> ok | {error, any()}.
  168. ensure_uninstalled(NameVsn) ->
  169. case read_plugin_info(NameVsn, #{}) of
  170. {ok, #{running_status := RunningSt}} when RunningSt =/= stopped ->
  171. {error, #{
  172. msg => "bad_plugin_running_status",
  173. hint => "stop_the_plugin_first"
  174. }};
  175. {ok, #{config_status := enabled}} ->
  176. {error, #{
  177. msg => "bad_plugin_config_status",
  178. hint => "disable_the_plugin_first"
  179. }};
  180. _ ->
  181. purge(NameVsn),
  182. ensure_delete(NameVsn)
  183. end.
  184. %% @doc Ensure a plugin is enabled to the end of the plugins list.
  185. -spec ensure_enabled(name_vsn()) -> ok | {error, any()}.
  186. ensure_enabled(NameVsn) ->
  187. ensure_enabled(NameVsn, no_move).
  188. %% @doc Ensure a plugin is enabled at the given position of the plugin list.
  189. -spec ensure_enabled(name_vsn(), position()) -> ok | {error, any()}.
  190. ensure_enabled(NameVsn, Position) ->
  191. ensure_state(NameVsn, Position, _Enabled = true, _ConfLocation = local).
  192. -spec ensure_enabled(name_vsn(), position(), local | global) -> ok | {error, any()}.
  193. ensure_enabled(NameVsn, Position, ConfLocation) when
  194. ConfLocation =:= local; ConfLocation =:= global
  195. ->
  196. ensure_state(NameVsn, Position, _Enabled = true, ConfLocation).
  197. %% @doc Ensure a plugin is disabled.
  198. -spec ensure_disabled(name_vsn()) -> ok | {error, any()}.
  199. ensure_disabled(NameVsn) ->
  200. ensure_state(NameVsn, no_move, false, _ConfLocation = local).
  201. %% @doc Delete extracted dir
  202. %% In case one lib is shared by multiple plugins.
  203. %% it might be the case that purging one plugin's install dir
  204. %% will cause deletion of loaded beams.
  205. %% It should not be a problem, because shared lib should
  206. %% reside in all the plugin install dirs.
  207. -spec purge(name_vsn()) -> ok.
  208. purge(NameVsn) ->
  209. _ = maybe_purge_plugin_config(NameVsn),
  210. purge_plugin(NameVsn).
  211. %% @doc Delete the package file.
  212. -spec delete_package(name_vsn()) -> ok.
  213. delete_package(NameVsn) ->
  214. File = pkg_file_path(NameVsn),
  215. _ = emqx_plugins_serde:delete_schema(NameVsn),
  216. case file:delete(File) of
  217. ok ->
  218. ?SLOG(info, #{msg => "purged_plugin_dir", path => File}),
  219. ok;
  220. {error, enoent} ->
  221. ok;
  222. {error, Reason} ->
  223. ?SLOG(error, #{
  224. msg => "failed_to_delete_package_file",
  225. path => File,
  226. reason => Reason
  227. }),
  228. {error, Reason}
  229. end.
  230. %%--------------------------------------------------------------------
  231. %% Plugin runtime management
  232. %% @doc Start all configured plugins are started.
  233. -spec ensure_started() -> ok.
  234. ensure_started() ->
  235. Fun = fun
  236. (#{name_vsn := NameVsn, enable := true}) ->
  237. case do_ensure_started(NameVsn) of
  238. ok -> [];
  239. {error, Reason} -> [{NameVsn, Reason}]
  240. end;
  241. (#{name_vsn := NameVsn, enable := false}) ->
  242. ?SLOG(debug, #{msg => "plugin_disabled", name_vsn => NameVsn}),
  243. []
  244. end,
  245. ok = for_plugins(Fun).
  246. %% @doc Start a plugin from Management API or CLI.
  247. %% the input is a <name>-<vsn> string.
  248. -spec ensure_started(name_vsn()) -> ok | {error, term()}.
  249. ensure_started(NameVsn) ->
  250. case do_ensure_started(NameVsn) of
  251. ok ->
  252. ok;
  253. {error, ReasonMap} ->
  254. ?SLOG(error, ReasonMap#{msg => "failed_to_start_plugin"}),
  255. {error, ReasonMap}
  256. end.
  257. %% @doc Stop all plugins before broker stops.
  258. -spec ensure_stopped() -> ok.
  259. ensure_stopped() ->
  260. Fun = fun
  261. (#{name_vsn := NameVsn, enable := true}) ->
  262. case ensure_stopped(NameVsn) of
  263. ok ->
  264. [];
  265. {error, Reason} ->
  266. [{NameVsn, Reason}]
  267. end;
  268. (#{name_vsn := NameVsn, enable := false}) ->
  269. ?SLOG(debug, #{msg => "plugin_disabled", action => stop_plugin, name_vsn => NameVsn}),
  270. []
  271. end,
  272. ok = for_plugins(Fun).
  273. %% @doc Stop a plugin from Management API or CLI.
  274. -spec ensure_stopped(name_vsn()) -> ok | {error, term()}.
  275. ensure_stopped(NameVsn) ->
  276. tryit(
  277. "stop_plugin",
  278. fun() ->
  279. Plugin = do_read_plugin(NameVsn),
  280. ensure_apps_stopped(Plugin)
  281. end
  282. ).
  283. get_config(Name, Vsn, Opt, Default) ->
  284. get_config(make_name_vsn_string(Name, Vsn), Opt, Default).
  285. -spec get_config(name_vsn()) ->
  286. {ok, plugin_config_map() | any()}
  287. | {error, term()}.
  288. get_config(NameVsn) ->
  289. get_config(NameVsn, ?CONFIG_FORMAT_MAP, #{}).
  290. -spec get_config(name_vsn(), ?CONFIG_FORMAT_MAP | ?CONFIG_FORMAT_BIN) ->
  291. {ok, raw_plugin_config_content() | plugin_config_map() | any()}
  292. | {error, term()}.
  293. get_config(NameVsn, ?CONFIG_FORMAT_MAP) ->
  294. get_config(NameVsn, ?CONFIG_FORMAT_MAP, #{});
  295. get_config(NameVsn, ?CONFIG_FORMAT_BIN) ->
  296. get_config_bin(NameVsn).
  297. %% Present default config value only in map format.
  298. -spec get_config(name_vsn(), ?CONFIG_FORMAT_MAP, any()) ->
  299. {ok, plugin_config_map() | any()}
  300. | {error, term()}.
  301. get_config(NameVsn, ?CONFIG_FORMAT_MAP, Default) ->
  302. {ok, persistent_term:get(?PLUGIN_PERSIS_CONFIG_KEY(bin(NameVsn)), Default)}.
  303. get_config_bin(NameVsn) ->
  304. %% no default value when get raw binary config
  305. case read_plugin_hocon(NameVsn) of
  306. {ok, _Map} = Res -> Res;
  307. {error, _Reason} = Err -> Err
  308. end.
  309. %% @doc Update plugin's config.
  310. %% RPC call from Management API or CLI.
  311. %% The plugin config Json Map was valid by avro schema
  312. %% Or: if no and plugin config ALWAYS be valid before calling this function.
  313. put_config(NameVsn, ConfigJsonMap, AvroValue) when not is_binary(NameVsn) ->
  314. put_config(bin(NameVsn), ConfigJsonMap, AvroValue);
  315. put_config(NameVsn, ConfigJsonMap, _AvroValue) ->
  316. HoconBin = hocon_pp:do(ConfigJsonMap, #{}),
  317. ok = backup_and_write_hocon_bin(NameVsn, HoconBin),
  318. %% TODO: callback in plugin's on_config_changed (config update by mgmt API)
  319. %% TODO: callback in plugin's on_config_upgraded (config vsn upgrade v1 -> v2)
  320. ok = persistent_term:put(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn), ConfigJsonMap),
  321. ok.
  322. %% @doc Stop and then start the plugin.
  323. restart(NameVsn) ->
  324. case ensure_stopped(NameVsn) of
  325. ok -> ensure_started(NameVsn);
  326. {error, Reason} -> {error, Reason}
  327. end.
  328. %% @doc List all installed plugins.
  329. %% Including the ones that are installed, but not enabled in config.
  330. -spec list() -> [plugin_info()].
  331. list() ->
  332. Pattern = filename:join([install_dir(), "*", "release.json"]),
  333. All = lists:filtermap(
  334. fun(JsonFilePath) ->
  335. [_, NameVsn | _] = lists:reverse(filename:split(JsonFilePath)),
  336. case read_plugin_info(NameVsn, #{}) of
  337. {ok, Info} ->
  338. {true, Info};
  339. {error, Reason} ->
  340. ?SLOG(warning, Reason#{msg => "failed_to_read_plugin_info"}),
  341. false
  342. end
  343. end,
  344. filelib:wildcard(Pattern)
  345. ),
  346. do_list(configured(), All).
  347. %%--------------------------------------------------------------------
  348. %% Package utils
  349. -spec decode_plugin_config_map(name_vsn(), map() | binary()) ->
  350. {ok, map() | ?plugin_without_config_schema}
  351. | {error, any()}.
  352. decode_plugin_config_map(NameVsn, AvroJsonMap) ->
  353. case with_plugin_avsc(NameVsn) of
  354. true ->
  355. case emqx_plugins_serde:lookup_serde(NameVsn) of
  356. {error, not_found} ->
  357. Reason = "plugin_config_schema_serde_not_found",
  358. ?SLOG(error, #{
  359. msg => Reason, name_vsn => NameVsn, plugin_with_avro_schema => true
  360. }),
  361. {error, Reason};
  362. {ok, _Serde} ->
  363. do_decode_plugin_config_map(NameVsn, AvroJsonMap)
  364. end;
  365. false ->
  366. ?SLOG(debug, #{
  367. msg => "plugin_without_config_schema",
  368. name_vsn => NameVsn
  369. }),
  370. {ok, ?plugin_without_config_schema}
  371. end.
  372. do_decode_plugin_config_map(NameVsn, AvroJsonMap) when is_map(AvroJsonMap) ->
  373. do_decode_plugin_config_map(NameVsn, emqx_utils_json:encode(AvroJsonMap));
  374. do_decode_plugin_config_map(NameVsn, AvroJsonBin) ->
  375. case emqx_plugins_serde:decode(NameVsn, AvroJsonBin) of
  376. {ok, Config} -> {ok, Config};
  377. {error, ReasonMap} -> {error, ReasonMap}
  378. end.
  379. -spec with_plugin_avsc(name_vsn()) -> boolean().
  380. with_plugin_avsc(NameVsn) ->
  381. case read_plugin_info(NameVsn, #{fill_readme => false}) of
  382. {ok, #{<<"with_config_schema">> := WithAvsc}} when is_boolean(WithAvsc) ->
  383. WithAvsc;
  384. _ ->
  385. false
  386. end.
  387. get_config_interal(Key, Default) when is_atom(Key) ->
  388. get_config_interal([Key], Default);
  389. get_config_interal(Path, Default) ->
  390. emqx_conf:get([?CONF_ROOT | Path], Default).
  391. put_config_internal(Key, Value) ->
  392. do_put_config_internal(Key, Value, _ConfLocation = local).
  393. -spec get_tar(name_vsn()) -> {ok, binary()} | {error, any}.
  394. get_tar(NameVsn) ->
  395. TarGz = pkg_file_path(NameVsn),
  396. case file:read_file(TarGz) of
  397. {ok, Content} ->
  398. {ok, Content};
  399. {error, _} ->
  400. case maybe_create_tar(NameVsn, TarGz, install_dir()) of
  401. ok ->
  402. file:read_file(TarGz);
  403. Err ->
  404. Err
  405. end
  406. end.
  407. %%--------------------------------------------------------------------
  408. %% Internal
  409. %%--------------------------------------------------------------------
  410. maybe_create_tar(NameVsn, TarGzName, InstallDir) when is_binary(InstallDir) ->
  411. maybe_create_tar(NameVsn, TarGzName, binary_to_list(InstallDir));
  412. maybe_create_tar(NameVsn, TarGzName, InstallDir) ->
  413. case filelib:wildcard(filename:join(plugin_dir(NameVsn), "**")) of
  414. [_ | _] = PluginFiles ->
  415. InstallDir1 = string:trim(InstallDir, trailing, "/") ++ "/",
  416. PluginFiles1 = [{string:prefix(F, InstallDir1), F} || F <- PluginFiles],
  417. erl_tar:create(TarGzName, PluginFiles1, [compressed]);
  418. _ ->
  419. {error, plugin_not_found}
  420. end.
  421. write_tar_file_content(BaseDir, TarContent) ->
  422. lists:foreach(
  423. fun({Name, Bin}) ->
  424. Filename = filename:join(BaseDir, Name),
  425. ok = filelib:ensure_dir(Filename),
  426. ok = file:write_file(Filename, Bin)
  427. end,
  428. TarContent
  429. ).
  430. delete_tar_file_content(BaseDir, TarContent) ->
  431. lists:foreach(
  432. fun({Name, _}) ->
  433. Filename = filename:join(BaseDir, Name),
  434. case filelib:is_file(Filename) of
  435. true ->
  436. TopDirOrFile = top_dir(BaseDir, Filename),
  437. ok = file:del_dir_r(TopDirOrFile);
  438. false ->
  439. %% probably already deleted
  440. ok
  441. end
  442. end,
  443. TarContent
  444. ).
  445. top_dir(BaseDir0, DirOrFile) ->
  446. BaseDir = normalize_dir(BaseDir0),
  447. case filename:dirname(DirOrFile) of
  448. RockBottom when RockBottom =:= "/" orelse RockBottom =:= "." ->
  449. throw({out_of_bounds, DirOrFile});
  450. BaseDir ->
  451. DirOrFile;
  452. Parent ->
  453. top_dir(BaseDir, Parent)
  454. end.
  455. normalize_dir(Dir) ->
  456. %% Get rid of possible trailing slash
  457. filename:join([Dir, ""]).
  458. -ifdef(TEST).
  459. normalize_dir_test_() ->
  460. [
  461. ?_assertEqual("foo", normalize_dir("foo")),
  462. ?_assertEqual("foo", normalize_dir("foo/")),
  463. ?_assertEqual("/foo", normalize_dir("/foo")),
  464. ?_assertEqual("/foo", normalize_dir("/foo/"))
  465. ].
  466. top_dir_test_() ->
  467. [
  468. ?_assertEqual("base/foo", top_dir("base", filename:join(["base", "foo", "bar"]))),
  469. ?_assertEqual("/base/foo", top_dir("/base", filename:join(["/", "base", "foo", "bar"]))),
  470. ?_assertEqual("/base/foo", top_dir("/base/", filename:join(["/", "base", "foo", "bar"]))),
  471. ?_assertThrow({out_of_bounds, _}, top_dir("/base", filename:join(["/", "base"]))),
  472. ?_assertThrow({out_of_bounds, _}, top_dir("/base", filename:join(["/", "foo", "bar"])))
  473. ].
  474. -endif.
  475. do_ensure_installed(NameVsn) ->
  476. TarGz = pkg_file_path(NameVsn),
  477. case erl_tar:extract(TarGz, [compressed, memory]) of
  478. {ok, TarContent} ->
  479. ok = write_tar_file_content(install_dir(), TarContent),
  480. case read_plugin_info(NameVsn, #{}) of
  481. {ok, _} ->
  482. ok;
  483. {error, Reason} ->
  484. ?SLOG(warning, Reason#{msg => "failed_to_read_after_install"}),
  485. ok = delete_tar_file_content(install_dir(), TarContent),
  486. {error, Reason}
  487. end;
  488. {error, {_, enoent}} ->
  489. {error, #{
  490. msg => "failed_to_extract_plugin_package",
  491. path => TarGz,
  492. reason => plugin_tarball_not_found
  493. }};
  494. {error, Reason} ->
  495. {error, #{
  496. msg => "bad_plugin_package",
  497. path => TarGz,
  498. reason => Reason
  499. }}
  500. end.
  501. ensure_delete(NameVsn0) ->
  502. NameVsn = bin(NameVsn0),
  503. List = configured(),
  504. put_configured(lists:filter(fun(#{name_vsn := N1}) -> bin(N1) =/= NameVsn end, List)),
  505. ok.
  506. ensure_state(NameVsn, Position, State, ConfLocation) when is_binary(NameVsn) ->
  507. ensure_state(binary_to_list(NameVsn), Position, State, ConfLocation);
  508. ensure_state(NameVsn, Position, State, ConfLocation) ->
  509. case read_plugin_info(NameVsn, #{}) of
  510. {ok, _} ->
  511. Item = #{
  512. name_vsn => NameVsn,
  513. enable => State
  514. },
  515. tryit(
  516. "ensure_state",
  517. fun() -> ensure_configured(Item, Position, ConfLocation) end
  518. );
  519. {error, Reason} ->
  520. ?SLOG(error, #{msg => "ensure_plugin_states_failed", reason => Reason}),
  521. {error, Reason}
  522. end.
  523. ensure_configured(#{name_vsn := NameVsn} = Item, Position, ConfLocation) ->
  524. Configured = configured(),
  525. SplitFun = fun(#{name_vsn := Nv}) -> bin(Nv) =/= bin(NameVsn) end,
  526. {Front, Rear} = lists:splitwith(SplitFun, Configured),
  527. NewConfigured =
  528. case Rear of
  529. [_ | More] when Position =:= no_move ->
  530. Front ++ [Item | More];
  531. [_ | More] ->
  532. add_new_configured(Front ++ More, Position, Item);
  533. [] ->
  534. add_new_configured(Configured, Position, Item)
  535. end,
  536. ok = put_configured(NewConfigured, ConfLocation).
  537. add_new_configured(Configured, no_move, Item) ->
  538. %% default to rear
  539. add_new_configured(Configured, rear, Item);
  540. add_new_configured(Configured, front, Item) ->
  541. [Item | Configured];
  542. add_new_configured(Configured, rear, Item) ->
  543. Configured ++ [Item];
  544. add_new_configured(Configured, {Action, NameVsn}, Item) ->
  545. SplitFun = fun(#{name_vsn := Nv}) -> bin(Nv) =/= bin(NameVsn) end,
  546. {Front, Rear} = lists:splitwith(SplitFun, Configured),
  547. Rear =:= [] andalso
  548. throw(#{
  549. msg => "position_anchor_plugin_not_configured",
  550. hint => "maybe_install_and_configure",
  551. name_vsn => NameVsn
  552. }),
  553. case Action of
  554. before ->
  555. Front ++ [Item | Rear];
  556. behind ->
  557. [Anchor | Rear0] = Rear,
  558. Front ++ [Anchor, Item | Rear0]
  559. end.
  560. maybe_purge_plugin_config(NameVsn) ->
  561. _ = persistent_term:erase(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn)),
  562. ok.
  563. purge_plugin(NameVsn) ->
  564. Dir = plugin_dir(NameVsn),
  565. purge_plugin_dir(Dir).
  566. purge_plugin_dir(Dir) ->
  567. case file:del_dir_r(Dir) of
  568. ok ->
  569. ?SLOG(info, #{
  570. msg => "purged_plugin_dir",
  571. dir => Dir
  572. });
  573. {error, enoent} ->
  574. ok;
  575. {error, Reason} ->
  576. ?SLOG(error, #{
  577. msg => "failed_to_purge_plugin_dir",
  578. dir => Dir,
  579. reason => Reason
  580. }),
  581. {error, Reason}
  582. end.
  583. %% Make sure configured ones are ordered in front.
  584. do_list([], All) ->
  585. All;
  586. do_list([#{name_vsn := NameVsn} | Rest], All) ->
  587. SplitF = fun(#{<<"name">> := Name, <<"rel_vsn">> := Vsn}) ->
  588. bin([Name, "-", Vsn]) =/= bin(NameVsn)
  589. end,
  590. case lists:splitwith(SplitF, All) of
  591. {_, []} ->
  592. ?SLOG(warning, #{
  593. msg => "configured_plugin_not_installed",
  594. name_vsn => NameVsn
  595. }),
  596. do_list(Rest, All);
  597. {Front, [I | Rear]} ->
  598. [I | do_list(Rest, Front ++ Rear)]
  599. end.
  600. do_ensure_started(NameVsn) ->
  601. tryit(
  602. "start_plugins",
  603. fun() ->
  604. case ensure_exists_and_installed(NameVsn) of
  605. ok ->
  606. Plugin = do_read_plugin(NameVsn),
  607. ok = load_code_start_apps(NameVsn, Plugin);
  608. {error, #{reason := Reason} = ReasonMap} ->
  609. ?SLOG(error, #{
  610. msg => "failed_to_start_plugin",
  611. name_vsn => NameVsn,
  612. reason => Reason
  613. }),
  614. {error, ReasonMap}
  615. end
  616. end
  617. ).
  618. %%--------------------------------------------------------------------
  619. %% try the function, catch 'throw' exceptions as normal 'error' return
  620. %% other exceptions with stacktrace logged.
  621. tryit(WhichOp, F) ->
  622. try
  623. F()
  624. catch
  625. throw:ReasonMap when is_map(ReasonMap) ->
  626. %% thrown exceptions are known errors
  627. %% translate to a return value without stacktrace
  628. {error, ReasonMap};
  629. throw:Reason ->
  630. {error, #{reason => Reason}};
  631. error:Reason:Stacktrace ->
  632. %% unexpected errors, log stacktrace
  633. ?SLOG(warning, #{
  634. msg => "plugin_op_failed",
  635. which_op => WhichOp,
  636. exception => Reason,
  637. stacktrace => Stacktrace
  638. }),
  639. {error, #{
  640. which_op => WhichOp,
  641. exception => Reason,
  642. stacktrace => Stacktrace
  643. }}
  644. end.
  645. %% read plugin info from the JSON file
  646. %% returns {ok, Info} or {error, Reason}
  647. read_plugin_info(NameVsn, Options) ->
  648. tryit(
  649. atom_to_list(?FUNCTION_NAME),
  650. fun() -> {ok, do_read_plugin(NameVsn, Options)} end
  651. ).
  652. do_read_plugin(NameVsn) ->
  653. do_read_plugin(NameVsn, #{}).
  654. do_read_plugin(NameVsn, Option) ->
  655. do_read_plugin(NameVsn, info_file_path(NameVsn), Option).
  656. do_read_plugin(NameVsn, InfoFilePath, Options) ->
  657. {ok, PlainMap} = (read_file_fun(InfoFilePath, "bad_info_file", #{read_mode => ?JSON_MAP}))(),
  658. Info0 = check_plugin(PlainMap, NameVsn, InfoFilePath),
  659. Info1 = plugins_readme(NameVsn, Options, Info0),
  660. plugin_status(NameVsn, Info1).
  661. read_plugin_avsc(NameVsn) ->
  662. read_plugin_avsc(NameVsn, #{read_mode => ?JSON_MAP}).
  663. read_plugin_avsc(NameVsn, Options) ->
  664. tryit(
  665. atom_to_list(?FUNCTION_NAME),
  666. read_file_fun(avsc_file_path(NameVsn), "bad_avsc_file", Options)
  667. ).
  668. read_plugin_i18n(NameVsn) ->
  669. read_plugin_i18n(NameVsn, #{read_mode => ?JSON_MAP}).
  670. read_plugin_i18n(NameVsn, Options) ->
  671. tryit(
  672. atom_to_list(?FUNCTION_NAME),
  673. read_file_fun(i18n_file_path(NameVsn), "bad_i18n_file", Options)
  674. ).
  675. read_plugin_hocon(NameVsn) ->
  676. read_plugin_hocon(NameVsn, #{read_mode => ?RAW_BIN}).
  677. read_plugin_hocon(NameVsn, Options) ->
  678. tryit(
  679. atom_to_list(?FUNCTION_NAME),
  680. read_file_fun(plugin_config_file(NameVsn), "bad_hocon_file", Options)
  681. ).
  682. ensure_exists_and_installed(NameVsn) ->
  683. case filelib:is_dir(plugin_dir(NameVsn)) of
  684. true ->
  685. ok;
  686. false ->
  687. %% Do we have the package, but it's not extracted yet?
  688. case get_tar(NameVsn) of
  689. {ok, TarContent} ->
  690. ok = file:write_file(pkg_file_path(NameVsn), TarContent),
  691. do_ensure_installed(NameVsn);
  692. _ ->
  693. %% If not, try to get it from the cluster.
  694. do_get_from_cluster(NameVsn)
  695. end
  696. end.
  697. do_get_from_cluster(NameVsn) ->
  698. Nodes = [N || N <- mria:running_nodes(), N /= node()],
  699. case get_plugin_tar_from_any_node(Nodes, NameVsn, []) of
  700. {ok, TarContent} ->
  701. ok = file:write_file(pkg_file_path(NameVsn), TarContent),
  702. ok = do_ensure_installed(NameVsn);
  703. {error, NodeErrors} when Nodes =/= [] ->
  704. ErrMeta = #{
  705. msg => "failed_to_copy_plugin_from_other_nodes",
  706. name_vsn => NameVsn,
  707. node_errors => NodeErrors,
  708. reason => plugin_not_found
  709. },
  710. ?SLOG(error, ErrMeta),
  711. {error, ErrMeta};
  712. {error, _} ->
  713. ErrMeta = #{
  714. msg => "no_nodes_to_copy_plugin_from",
  715. name_vsn => NameVsn,
  716. reason => plugin_not_found
  717. },
  718. ?SLOG(error, ErrMeta),
  719. {error, ErrMeta}
  720. end.
  721. get_plugin_tar_from_any_node([], _NameVsn, Errors) ->
  722. {error, Errors};
  723. get_plugin_tar_from_any_node([Node | T], NameVsn, Errors) ->
  724. case emqx_plugins_proto_v1:get_tar(Node, NameVsn, infinity) of
  725. {ok, _} = Res ->
  726. ?SLOG(debug, #{
  727. msg => "get_plugin_tar_from_cluster_successfully",
  728. node => Node,
  729. name_vsn => NameVsn
  730. }),
  731. Res;
  732. Err ->
  733. get_plugin_tar_from_any_node(T, NameVsn, [{Node, Err} | Errors])
  734. end.
  735. get_plugin_config_from_any_node([], _NameVsn, Errors) ->
  736. {error, Errors};
  737. get_plugin_config_from_any_node([Node | T], NameVsn, Errors) ->
  738. case
  739. emqx_plugins_proto_v2:get_config(
  740. Node, NameVsn, ?CONFIG_FORMAT_MAP, ?plugin_conf_not_found, 5_000
  741. )
  742. of
  743. {ok, _} = Res ->
  744. ?SLOG(debug, #{
  745. msg => "get_plugin_config_from_cluster_successfully",
  746. node => Node,
  747. name_vsn => NameVsn
  748. }),
  749. Res;
  750. Err ->
  751. get_plugin_config_from_any_node(T, NameVsn, [{Node, Err} | Errors])
  752. end.
  753. plugins_readme(NameVsn, #{fill_readme := true}, Info) ->
  754. case file:read_file(readme_file(NameVsn)) of
  755. {ok, Bin} -> Info#{readme => Bin};
  756. _ -> Info#{readme => <<>>}
  757. end;
  758. plugins_readme(_NameVsn, _Options, Info) ->
  759. Info.
  760. plugin_status(NameVsn, Info) ->
  761. {ok, AppName, _AppVsn} = parse_name_vsn(NameVsn),
  762. RunningSt =
  763. case application:get_key(AppName, vsn) of
  764. {ok, _} ->
  765. case lists:keyfind(AppName, 1, running_apps()) of
  766. {AppName, _} -> running;
  767. _ -> loaded
  768. end;
  769. undefined ->
  770. stopped
  771. end,
  772. Configured = lists:filtermap(
  773. fun(#{name_vsn := Nv, enable := St}) ->
  774. case bin(Nv) =:= bin(NameVsn) of
  775. true -> {true, St};
  776. false -> false
  777. end
  778. end,
  779. configured()
  780. ),
  781. ConfSt =
  782. case Configured of
  783. [] -> not_configured;
  784. [true] -> enabled;
  785. [false] -> disabled
  786. end,
  787. Info#{
  788. running_status => RunningSt,
  789. config_status => ConfSt
  790. }.
  791. check_plugin(
  792. #{
  793. <<"name">> := Name,
  794. <<"rel_vsn">> := Vsn,
  795. <<"rel_apps">> := Apps,
  796. <<"description">> := _
  797. } = Info,
  798. NameVsn,
  799. FilePath
  800. ) ->
  801. case bin(NameVsn) =:= bin([Name, "-", Vsn]) of
  802. true ->
  803. try
  804. %% assert
  805. [_ | _] = Apps,
  806. %% validate if the list is all <app>-<vsn> strings
  807. lists:foreach(fun(App) -> {ok, _, _} = parse_name_vsn(App) end, Apps)
  808. catch
  809. _:_ ->
  810. throw(#{
  811. msg => "bad_rel_apps",
  812. rel_apps => Apps,
  813. hint => "A non-empty string list of app_name-app_vsn format"
  814. })
  815. end,
  816. Info;
  817. false ->
  818. throw(#{
  819. msg => "name_vsn_mismatch",
  820. name_vsn => NameVsn,
  821. path => FilePath,
  822. name => Name,
  823. rel_vsn => Vsn
  824. })
  825. end;
  826. check_plugin(_What, NameVsn, File) ->
  827. throw(#{
  828. msg => "bad_info_file_content",
  829. mandatory_fields => [rel_vsn, name, rel_apps, description],
  830. name_vsn => NameVsn,
  831. path => File
  832. }).
  833. load_code_start_apps(RelNameVsn, #{<<"rel_apps">> := Apps}) ->
  834. LibDir = filename:join([install_dir(), RelNameVsn]),
  835. RunningApps = running_apps(),
  836. %% load plugin apps and beam code
  837. AppNames =
  838. lists:map(
  839. fun(AppNameVsn) ->
  840. {ok, AppName, AppVsn} = parse_name_vsn(AppNameVsn),
  841. EbinDir = filename:join([LibDir, AppNameVsn, "ebin"]),
  842. ok = load_plugin_app(AppName, AppVsn, EbinDir, RunningApps),
  843. AppName
  844. end,
  845. Apps
  846. ),
  847. lists:foreach(fun start_app/1, AppNames).
  848. load_plugin_app(AppName, AppVsn, Ebin, RunningApps) ->
  849. case lists:keyfind(AppName, 1, RunningApps) of
  850. false ->
  851. do_load_plugin_app(AppName, Ebin);
  852. {_, Vsn} ->
  853. case bin(Vsn) =:= bin(AppVsn) of
  854. true ->
  855. %% already started on the exact version
  856. ok;
  857. false ->
  858. %% running but a different version
  859. ?SLOG(warning, #{
  860. msg => "plugin_app_already_running",
  861. name => AppName,
  862. running_vsn => Vsn,
  863. loading_vsn => AppVsn
  864. })
  865. end
  866. end.
  867. do_load_plugin_app(AppName, Ebin) when is_binary(Ebin) ->
  868. do_load_plugin_app(AppName, binary_to_list(Ebin));
  869. do_load_plugin_app(AppName, Ebin) ->
  870. _ = code:add_patha(Ebin),
  871. Modules = filelib:wildcard(filename:join([Ebin, "*.beam"])),
  872. lists:foreach(
  873. fun(BeamFile) ->
  874. Module = list_to_atom(filename:basename(BeamFile, ".beam")),
  875. _ = code:purge(Module),
  876. case code:load_file(Module) of
  877. {module, _} ->
  878. ok;
  879. {error, Reason} ->
  880. throw(#{
  881. msg => "failed_to_load_plugin_beam",
  882. path => BeamFile,
  883. reason => Reason
  884. })
  885. end
  886. end,
  887. Modules
  888. ),
  889. case application:load(AppName) of
  890. ok ->
  891. ok;
  892. {error, {already_loaded, _}} ->
  893. ok;
  894. {error, Reason} ->
  895. throw(#{
  896. msg => "failed_to_load_plugin_app",
  897. name => AppName,
  898. reason => Reason
  899. })
  900. end.
  901. start_app(App) ->
  902. case application:ensure_all_started(App) of
  903. {ok, Started} ->
  904. case Started =/= [] of
  905. true -> ?SLOG(debug, #{msg => "started_plugin_apps", apps => Started});
  906. false -> ok
  907. end,
  908. ?SLOG(debug, #{msg => "started_plugin_app", app => App}),
  909. ok;
  910. {error, {ErrApp, Reason}} ->
  911. throw(#{
  912. msg => "failed_to_start_plugin_app",
  913. app => App,
  914. err_app => ErrApp,
  915. reason => Reason
  916. })
  917. end.
  918. %% Stop all apps installed by the plugin package,
  919. %% but not the ones shared with others.
  920. ensure_apps_stopped(#{<<"rel_apps">> := Apps}) ->
  921. %% load plugin apps and beam code
  922. AppsToStop = lists:filtermap(fun parse_name_vsn_for_stopping/1, Apps),
  923. case tryit("stop_apps", fun() -> stop_apps(AppsToStop) end) of
  924. {ok, []} ->
  925. %% all apps stopped
  926. ok;
  927. {ok, Left} ->
  928. ?SLOG(warning, #{
  929. msg => "unabled_to_stop_plugin_apps",
  930. apps => Left,
  931. reason => "running_apps_still_depends_on_this_apps"
  932. }),
  933. ok;
  934. {error, Reason} ->
  935. {error, Reason}
  936. end.
  937. %% On one hand, Elixir plugins might include Elixir itself, when targetting a non-Elixir
  938. %% EMQX release. If, on the other hand, the EMQX release already includes Elixir, we
  939. %% shouldn't stop Elixir nor IEx.
  940. -ifdef(EMQX_ELIXIR).
  941. is_protected_app(elixir) -> true;
  942. is_protected_app(iex) -> true;
  943. is_protected_app(_) -> false.
  944. parse_name_vsn_for_stopping(NameVsn) ->
  945. {ok, AppName, _AppVsn} = parse_name_vsn(NameVsn),
  946. case is_protected_app(AppName) of
  947. true ->
  948. false;
  949. false ->
  950. {true, AppName}
  951. end.
  952. %% ELSE ifdef(EMQX_ELIXIR)
  953. -else.
  954. parse_name_vsn_for_stopping(NameVsn) ->
  955. {ok, AppName, _AppVsn} = parse_name_vsn(NameVsn),
  956. {true, AppName}.
  957. %% END ifdef(EMQX_ELIXIR)
  958. -endif.
  959. stop_apps(Apps) ->
  960. RunningApps = running_apps(),
  961. case do_stop_apps(Apps, [], RunningApps) of
  962. %% all stopped
  963. {ok, []} -> {ok, []};
  964. %% no progress
  965. {ok, Remain} when Remain =:= Apps -> {ok, Apps};
  966. %% try again
  967. {ok, Remain} -> stop_apps(Remain)
  968. end.
  969. do_stop_apps([], Remain, _AllApps) ->
  970. {ok, lists:reverse(Remain)};
  971. do_stop_apps([App | Apps], Remain, RunningApps) ->
  972. case is_needed_by_any(App, RunningApps) of
  973. true ->
  974. do_stop_apps(Apps, [App | Remain], RunningApps);
  975. false ->
  976. ok = stop_app(App),
  977. do_stop_apps(Apps, Remain, RunningApps)
  978. end.
  979. stop_app(App) ->
  980. case application:stop(App) of
  981. ok ->
  982. ?SLOG(debug, #{msg => "stop_plugin_successfully", app => App}),
  983. ok = unload_module_and_app(App);
  984. {error, {not_started, App}} ->
  985. ?SLOG(debug, #{msg => "plugin_not_started", app => App}),
  986. ok = unload_module_and_app(App);
  987. {error, Reason} ->
  988. throw(#{msg => "failed_to_stop_app", app => App, reason => Reason})
  989. end.
  990. unload_module_and_app(App) ->
  991. case application:get_key(App, modules) of
  992. {ok, Modules} ->
  993. lists:foreach(fun code:soft_purge/1, Modules);
  994. _ ->
  995. ok
  996. end,
  997. _ = application:unload(App),
  998. ok.
  999. is_needed_by_any(AppToStop, RunningApps) ->
  1000. lists:any(
  1001. fun({RunningApp, _RunningAppVsn}) ->
  1002. is_needed_by(AppToStop, RunningApp)
  1003. end,
  1004. RunningApps
  1005. ).
  1006. is_needed_by(AppToStop, AppToStop) ->
  1007. false;
  1008. is_needed_by(AppToStop, RunningApp) ->
  1009. case application:get_key(RunningApp, applications) of
  1010. {ok, Deps} -> lists:member(AppToStop, Deps);
  1011. undefined -> false
  1012. end.
  1013. do_put_config_internal(Key, Value, ConfLocation) when is_atom(Key) ->
  1014. do_put_config_internal([Key], Value, ConfLocation);
  1015. do_put_config_internal(Path, Values, _ConfLocation = local) when is_list(Path) ->
  1016. Opts = #{rawconf_with_defaults => true, override_to => cluster},
  1017. %% Already in cluster_rpc, don't use emqx_conf:update, dead calls
  1018. case emqx:update_config([?CONF_ROOT | Path], bin_key(Values), Opts) of
  1019. {ok, _} -> ok;
  1020. Error -> Error
  1021. end;
  1022. do_put_config_internal(Path, Values, _ConfLocation = global) when is_list(Path) ->
  1023. Opts = #{rawconf_with_defaults => true, override_to => cluster},
  1024. case emqx_conf:update([?CONF_ROOT | Path], bin_key(Values), Opts) of
  1025. {ok, _} -> ok;
  1026. Error -> Error
  1027. end.
  1028. %%--------------------------------------------------------------------
  1029. %% `emqx_config_handler' API
  1030. %%--------------------------------------------------------------------
  1031. post_config_update([?CONF_ROOT], _Req, #{states := NewStates}, #{states := OldStates}, _Envs) ->
  1032. NewStatesIndex = maps:from_list([{NV, S} || S = #{name_vsn := NV} <- NewStates]),
  1033. OldStatesIndex = maps:from_list([{NV, S} || S = #{name_vsn := NV} <- OldStates]),
  1034. #{changed := Changed} = emqx_utils_maps:diff_maps(NewStatesIndex, OldStatesIndex),
  1035. maps:foreach(fun enable_disable_plugin/2, Changed),
  1036. ok;
  1037. post_config_update(_Path, _Req, _NewConf, _OldConf, _Envs) ->
  1038. ok.
  1039. enable_disable_plugin(NameVsn, {#{enable := true}, #{enable := false}}) ->
  1040. %% errors are already logged in this fn
  1041. _ = ensure_stopped(NameVsn),
  1042. ok;
  1043. enable_disable_plugin(NameVsn, {#{enable := false}, #{enable := true}}) ->
  1044. %% errors are already logged in this fn
  1045. _ = ensure_started(NameVsn),
  1046. ok;
  1047. enable_disable_plugin(_NameVsn, _Diff) ->
  1048. ok.
  1049. %%--------------------------------------------------------------------
  1050. %% Helper functions
  1051. %%--------------------------------------------------------------------
  1052. install_dir() ->
  1053. get_config_interal(install_dir, "").
  1054. put_configured(Configured) ->
  1055. put_configured(Configured, _ConfLocation = local).
  1056. put_configured(Configured, ConfLocation) ->
  1057. ok = do_put_config_internal(states, bin_key(Configured), ConfLocation).
  1058. configured() ->
  1059. get_config_interal(states, []).
  1060. for_plugins(ActionFun) ->
  1061. case lists:flatmap(ActionFun, configured()) of
  1062. [] ->
  1063. ok;
  1064. Errors ->
  1065. ErrMeta = #{function => ActionFun, errors => Errors},
  1066. ?tp(
  1067. for_plugins_action_error_occurred,
  1068. ErrMeta
  1069. ),
  1070. ?SLOG(error, ErrMeta#{msg => "for_plugins_action_error_occurred"}),
  1071. ok
  1072. end.
  1073. maybe_post_op_after_installed(NameVsn0, Mode) ->
  1074. NameVsn = wrap_to_list(NameVsn0),
  1075. _ = maybe_load_config_schema(NameVsn, Mode),
  1076. ok = maybe_ensure_state(NameVsn).
  1077. maybe_ensure_state(NameVsn) ->
  1078. EnsureStateFun = fun(#{name_vsn := NV, enable := Bool}, AccIn) ->
  1079. case NV of
  1080. NameVsn ->
  1081. %% Configured, using existed cluster config
  1082. _ = ensure_state(NV, no_move, Bool, global),
  1083. AccIn#{ensured => true};
  1084. _ ->
  1085. AccIn
  1086. end
  1087. end,
  1088. case lists:foldl(EnsureStateFun, #{ensured => false}, configured()) of
  1089. #{ensured := true} ->
  1090. ok;
  1091. #{ensured := false} ->
  1092. ?SLOG(info, #{msg => "plugin_not_configured", name_vsn => NameVsn}),
  1093. %% Clean installation, no config, ensure with `Enable = false`
  1094. _ = ensure_state(NameVsn, no_move, false, global),
  1095. ok
  1096. end,
  1097. ok.
  1098. maybe_load_config_schema(NameVsn, Mode) ->
  1099. AvscPath = avsc_file_path(NameVsn),
  1100. _ =
  1101. with_plugin_avsc(NameVsn) andalso
  1102. filelib:is_regular(AvscPath) andalso
  1103. do_load_config_schema(NameVsn, AvscPath),
  1104. _ = maybe_create_config_dir(NameVsn, Mode).
  1105. do_load_config_schema(NameVsn, AvscPath) ->
  1106. case emqx_plugins_serde:add_schema(bin(NameVsn), AvscPath) of
  1107. ok -> ok;
  1108. {error, already_exists} -> ok;
  1109. {error, _Reason} -> ok
  1110. end.
  1111. maybe_create_config_dir(NameVsn, Mode) ->
  1112. with_plugin_avsc(NameVsn) andalso
  1113. do_create_config_dir(NameVsn, Mode).
  1114. do_create_config_dir(NameVsn, Mode) ->
  1115. case plugin_config_dir(NameVsn) of
  1116. {error, Reason} ->
  1117. {error, {gen_config_dir_failed, Reason}};
  1118. ConfigDir ->
  1119. case filelib:ensure_path(ConfigDir) of
  1120. ok ->
  1121. %% get config from other nodes or get from tarball
  1122. _ = maybe_ensure_plugin_config(NameVsn, Mode),
  1123. ok;
  1124. {error, Reason} ->
  1125. ?SLOG(warning, #{
  1126. msg => "failed_to_create_plugin_config_dir",
  1127. dir => ConfigDir,
  1128. reason => Reason
  1129. }),
  1130. {error, {mkdir_failed, ConfigDir, Reason}}
  1131. end
  1132. end.
  1133. -spec maybe_ensure_plugin_config(name_vsn(), ?fresh_install | ?normal) -> ok.
  1134. maybe_ensure_plugin_config(NameVsn, Mode) ->
  1135. maybe
  1136. true ?= with_plugin_avsc(NameVsn),
  1137. _ = ensure_plugin_config({NameVsn, Mode})
  1138. else
  1139. _ -> ok
  1140. end.
  1141. -spec ensure_plugin_config({name_vsn(), ?fresh_install | ?normal}) -> ok.
  1142. ensure_plugin_config({NameVsn, ?normal}) ->
  1143. ensure_plugin_config(NameVsn, [N || N <- mria:running_nodes(), N /= node()]);
  1144. ensure_plugin_config({NameVsn, ?fresh_install}) ->
  1145. ?SLOG(debug, #{
  1146. msg => "default_plugin_config_used",
  1147. name_vsn => NameVsn,
  1148. hint => "fresh_install"
  1149. }),
  1150. cp_default_config_file(NameVsn).
  1151. -spec ensure_plugin_config(name_vsn(), list()) -> ok.
  1152. ensure_plugin_config(NameVsn, []) ->
  1153. ?SLOG(debug, #{
  1154. msg => "default_plugin_config_used",
  1155. name_vsn => NameVsn,
  1156. reason => "no_other_running_nodes"
  1157. }),
  1158. cp_default_config_file(NameVsn);
  1159. ensure_plugin_config(NameVsn, Nodes) ->
  1160. case get_plugin_config_from_any_node(Nodes, NameVsn, []) of
  1161. {ok, ConfigMap} when is_map(ConfigMap) ->
  1162. HoconBin = hocon_pp:do(ConfigMap, #{}),
  1163. Path = plugin_config_file(NameVsn),
  1164. ok = filelib:ensure_dir(Path),
  1165. ok = file:write_file(Path, HoconBin),
  1166. ensure_config_map(NameVsn);
  1167. _ ->
  1168. ?SLOG(error, #{msg => "config_not_found_from_cluster", name_vsn => NameVsn}),
  1169. cp_default_config_file(NameVsn)
  1170. end.
  1171. -spec cp_default_config_file(name_vsn()) -> ok.
  1172. cp_default_config_file(NameVsn) ->
  1173. %% always copy default hocon file into config dir when can not get config from other nodes
  1174. Source = default_plugin_config_file(NameVsn),
  1175. Destination = plugin_config_file(NameVsn),
  1176. maybe
  1177. true ?= filelib:is_regular(Source),
  1178. %% destination path not existed (not configured)
  1179. true ?= (not filelib:is_regular(Destination)),
  1180. ok = filelib:ensure_dir(Destination),
  1181. case file:copy(Source, Destination) of
  1182. {ok, _} ->
  1183. ensure_config_map(NameVsn);
  1184. {error, Reason} ->
  1185. ?SLOG(warning, #{
  1186. msg => "failed_to_copy_plugin_default_hocon_config",
  1187. source => Source,
  1188. destination => Destination,
  1189. reason => Reason
  1190. })
  1191. end
  1192. else
  1193. _ -> ensure_config_map(NameVsn)
  1194. end.
  1195. ensure_config_map(NameVsn) ->
  1196. case read_plugin_hocon(NameVsn, #{read_mode => ?JSON_MAP}) of
  1197. {ok, ConfigJsonMap} ->
  1198. case with_plugin_avsc(NameVsn) of
  1199. true ->
  1200. do_ensure_config_map(NameVsn, ConfigJsonMap);
  1201. false ->
  1202. ?SLOG(debug, #{
  1203. msg => "put_plugin_config_directly",
  1204. hint => "plugin_without_config_schema",
  1205. name_vsn => NameVsn
  1206. }),
  1207. put_config(NameVsn, ConfigJsonMap, ?plugin_without_config_schema)
  1208. end;
  1209. _ ->
  1210. ?SLOG(warning, #{msg => "failed_to_read_plugin_config_hocon", name_vsn => NameVsn}),
  1211. ok
  1212. end.
  1213. do_ensure_config_map(NameVsn, ConfigJsonMap) ->
  1214. case decode_plugin_config_map(NameVsn, ConfigJsonMap) of
  1215. {ok, AvroValue} ->
  1216. put_config(NameVsn, ConfigJsonMap, AvroValue);
  1217. {error, Reason} ->
  1218. ?SLOG(error, #{
  1219. msg => "plugin_config_validation_failed",
  1220. name_vsn => NameVsn,
  1221. reason => Reason
  1222. }),
  1223. ok
  1224. end.
  1225. %% @private Backup the current config to a file with a timestamp suffix and
  1226. %% then save the new config to the config file.
  1227. backup_and_write_hocon_bin(NameVsn, HoconBin) ->
  1228. %% this may fail, but we don't care
  1229. %% e.g. read-only file system
  1230. Path = plugin_config_file(NameVsn),
  1231. _ = filelib:ensure_dir(Path),
  1232. TmpFile = Path ++ ".tmp",
  1233. case file:write_file(TmpFile, HoconBin) of
  1234. ok ->
  1235. backup_and_replace(Path, TmpFile);
  1236. {error, Reason} ->
  1237. ?SLOG(error, #{
  1238. msg => "failed_to_save_plugin_conf_file",
  1239. hint =>
  1240. "The updated cluster config is not saved on this node, please check the file system.",
  1241. filename => TmpFile,
  1242. reason => Reason
  1243. }),
  1244. %% e.g. read-only, it's not the end of the world
  1245. ok
  1246. end.
  1247. backup_and_replace(Path, TmpPath) ->
  1248. Backup = Path ++ "." ++ now_time() ++ ".bak",
  1249. case file:rename(Path, Backup) of
  1250. ok ->
  1251. ok = file:rename(TmpPath, Path),
  1252. ok = prune_backup_files(Path);
  1253. {error, enoent} ->
  1254. %% not created yet
  1255. ok = file:rename(TmpPath, Path);
  1256. {error, Reason} ->
  1257. ?SLOG(warning, #{
  1258. msg => "failed_to_backup_plugin_conf_file",
  1259. filename => Backup,
  1260. reason => Reason
  1261. }),
  1262. ok
  1263. end.
  1264. prune_backup_files(Path) ->
  1265. Files0 = filelib:wildcard(Path ++ ".*"),
  1266. Re = "\\.[0-9]{4}\\.[0-9]{2}\\.[0-9]{2}\\.[0-9]{2}\\.[0-9]{2}\\.[0-9]{2}\\.[0-9]{3}\\.bak$",
  1267. Files = lists:filter(fun(F) -> re:run(F, Re) =/= nomatch end, Files0),
  1268. Sorted = lists:reverse(lists:sort(Files)),
  1269. {_Keeps, Deletes} = lists:split(min(?MAX_KEEP_BACKUP_CONFIGS, length(Sorted)), Sorted),
  1270. lists:foreach(
  1271. fun(F) ->
  1272. case file:delete(F) of
  1273. ok ->
  1274. ok;
  1275. {error, Reason} ->
  1276. ?SLOG(warning, #{
  1277. msg => "failed_to_delete_backup_plugin_conf_file",
  1278. filename => F,
  1279. reason => Reason
  1280. }),
  1281. ok
  1282. end
  1283. end,
  1284. Deletes
  1285. ).
  1286. read_file_fun(Path, Msg, #{read_mode := ?RAW_BIN}) ->
  1287. fun() ->
  1288. case file:read_file(Path) of
  1289. {ok, Bin} ->
  1290. {ok, Bin};
  1291. {error, Reason} ->
  1292. ErrMeta = #{msg => Msg, reason => Reason},
  1293. throw(ErrMeta)
  1294. end
  1295. end;
  1296. read_file_fun(Path, Msg, #{read_mode := ?JSON_MAP}) ->
  1297. fun() ->
  1298. case hocon:load(Path, #{format => richmap}) of
  1299. {ok, RichMap} ->
  1300. {ok, hocon_maps:ensure_plain(RichMap)};
  1301. {error, Reason} ->
  1302. ErrMeta = #{msg => Msg, reason => Reason},
  1303. throw(ErrMeta)
  1304. end
  1305. end.
  1306. %% Directorys
  1307. -spec plugin_dir(name_vsn()) -> string().
  1308. plugin_dir(NameVsn) ->
  1309. wrap_to_list(filename:join([install_dir(), NameVsn])).
  1310. -spec plugin_priv_dir(name_vsn()) -> string().
  1311. plugin_priv_dir(NameVsn) ->
  1312. maybe
  1313. {ok, #{<<"name">> := Name, <<"rel_apps">> := Apps}} ?=
  1314. read_plugin_info(NameVsn, #{fill_readme => false}),
  1315. {ok, AppDir} ?= app_dir(Name, Apps),
  1316. wrap_to_list(filename:join([plugin_dir(NameVsn), AppDir, "priv"]))
  1317. else
  1318. %% Otherwise assume the priv directory is under the plugin root directory
  1319. _ -> wrap_to_list(filename:join([install_dir(), NameVsn, "priv"]))
  1320. end.
  1321. -spec plugin_config_dir(name_vsn()) -> string() | {error, Reason :: string()}.
  1322. plugin_config_dir(NameVsn) ->
  1323. case parse_name_vsn(NameVsn) of
  1324. {ok, NameAtom, _Vsn} ->
  1325. wrap_to_list(filename:join([emqx:data_dir(), "plugins", atom_to_list(NameAtom)]));
  1326. {error, Reason} ->
  1327. ?SLOG(warning, #{
  1328. msg => "failed_to_generate_plugin_config_dir_for_plugin",
  1329. plugin_namevsn => NameVsn,
  1330. reason => Reason
  1331. }),
  1332. {error, Reason}
  1333. end.
  1334. %% Files
  1335. -spec pkg_file_path(name_vsn()) -> string().
  1336. pkg_file_path(NameVsn) ->
  1337. wrap_to_list(filename:join([install_dir(), bin([NameVsn, ".tar.gz"])])).
  1338. -spec info_file_path(name_vsn()) -> string().
  1339. info_file_path(NameVsn) ->
  1340. wrap_to_list(filename:join([plugin_dir(NameVsn), "release.json"])).
  1341. -spec avsc_file_path(name_vsn()) -> string().
  1342. avsc_file_path(NameVsn) ->
  1343. wrap_to_list(filename:join([plugin_priv_dir(NameVsn), "config_schema.avsc"])).
  1344. -spec plugin_config_file(name_vsn()) -> string().
  1345. plugin_config_file(NameVsn) ->
  1346. wrap_to_list(filename:join([plugin_config_dir(NameVsn), "config.hocon"])).
  1347. %% should only used when plugin installing
  1348. -spec default_plugin_config_file(name_vsn()) -> string().
  1349. default_plugin_config_file(NameVsn) ->
  1350. wrap_to_list(filename:join([plugin_priv_dir(NameVsn), "config.hocon"])).
  1351. -spec i18n_file_path(name_vsn()) -> string().
  1352. i18n_file_path(NameVsn) ->
  1353. wrap_to_list(filename:join([plugin_priv_dir(NameVsn), "config_i18n.json"])).
  1354. -spec readme_file(name_vsn()) -> string().
  1355. readme_file(NameVsn) ->
  1356. wrap_to_list(filename:join([plugin_dir(NameVsn), "README.md"])).
  1357. running_apps() ->
  1358. lists:map(
  1359. fun({N, _, V}) ->
  1360. {N, V}
  1361. end,
  1362. application:which_applications(infinity)
  1363. ).
  1364. %% @private This is the same human-readable timestamp format as
  1365. %% hocon-cli generated app.<time>.config file name.
  1366. now_time() ->
  1367. Ts = os:system_time(millisecond),
  1368. {{Y, M, D}, {HH, MM, SS}} = calendar:system_time_to_local_time(Ts, millisecond),
  1369. Res = io_lib:format(
  1370. "~0p.~2..0b.~2..0b.~2..0b.~2..0b.~2..0b.~3..0b",
  1371. [Y, M, D, HH, MM, SS, Ts rem 1000]
  1372. ),
  1373. lists:flatten(Res).
  1374. bin_key(Map) when is_map(Map) ->
  1375. maps:fold(fun(K, V, Acc) -> Acc#{bin(K) => V} end, #{}, Map);
  1376. bin_key(List = [#{} | _]) ->
  1377. lists:map(fun(M) -> bin_key(M) end, List);
  1378. bin_key(Term) ->
  1379. Term.
  1380. bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
  1381. bin(L) when is_list(L) -> unicode:characters_to_binary(L, utf8);
  1382. bin(B) when is_binary(B) -> B.
  1383. wrap_to_list(Path) ->
  1384. binary_to_list(iolist_to_binary(Path)).