emqx_plugins.erl 20 KB


  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2017-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_plugins).
  17. -include_lib("emqx/include/emqx.hrl").
  18. -include_lib("emqx/include/logger.hrl").
  19. -export([ ensure_installed/1
  20. , ensure_uninstalled/1
  21. , ensure_enabled/1
  22. , ensure_enabled/2
  23. , ensure_disabled/1
  24. , delete_package/1
  25. ]).
  26. -export([ ensure_started/0
  27. , ensure_started/1
  28. , ensure_stopped/0
  29. , ensure_stopped/1
  30. , restart/1
  31. , list/0
  32. , describe/1
  33. ]).
  34. -export([ get_config/2
  35. , put_config/2
  36. ]).
  37. %% internal
  38. -export([ do_ensure_started/1
  39. ]).
  40. -ifdef(TEST).
  41. -compile(export_all).
  42. -compile(nowarn_export_all).
  43. -endif.
  44. -include_lib("emqx/include/emqx.hrl").
  45. -include_lib("emqx/include/logger.hrl").
  46. -include("emqx_plugins.hrl").
  47. -type name_vsn() :: binary() | string(). %% "my_plugin-0.1.0"
  48. -type plugin() :: map(). %% the parse result of the JSON info file
  49. -type position() :: no_move | front | rear | {before, name_vsn()}.
  50. %%--------------------------------------------------------------------
  51. %% APIs
  52. %%--------------------------------------------------------------------
  53. %% @doc Describe a plugin.
  54. -spec describe(name_vsn()) -> {ok, plugin()} | {error, any()}.
  55. describe(NameVsn) -> read_plugin(NameVsn).
  56. %% @doc Install a .tar.gz package placed in install_dir.
  57. -spec ensure_installed(name_vsn()) -> ok | {error, any()}.
  58. ensure_installed(NameVsn) ->
  59. case read_plugin(NameVsn) of
  60. {ok, _} ->
  61. ok;
  62. {error, _} ->
  63. ok = purge(NameVsn),
  64. do_ensure_installed(NameVsn)
  65. end.
  66. do_ensure_installed(NameVsn) ->
  67. TarGz = pkg_file(NameVsn),
  68. case erl_tar:extract(TarGz, [{cwd, install_dir()}, compressed]) of
  69. ok ->
  70. case read_plugin(NameVsn) of
  71. {ok, _} -> ok;
  72. {error, Reason} ->
  73. ?SLOG(warning, Reason#{msg => "failed_to_read_after_install"}),
  74. _ = ensure_uninstalled(NameVsn),
  75. {error, Reason}
  76. end;
  77. {error, {_, enoent}} ->
  78. {error, #{ reason => "failed_to_extract_plugin_package"
  79. , path => TarGz
  80. , return => not_found
  81. }};
  82. {error, Reason} ->
  83. {error, #{ reason => "bad_plugin_package"
  84. , path => TarGz
  85. , return => Reason
  86. }}
  87. end.
  88. %% @doc Ensure files and directories for the given plugin are delete.
  89. %% If a plugin is running, or enabled, error is returned.
  90. -spec ensure_uninstalled(name_vsn()) -> ok | {error, any()}.
  91. ensure_uninstalled(NameVsn) ->
  92. case read_plugin(NameVsn) of
  93. {ok, #{running_status := RunningSt}} when RunningSt =/= stopped ->
  94. {error, #{reason => "bad_plugin_running_status",
  95. hint => "stop_the_plugin_first"
  96. }};
  97. {ok, #{config_status := enabled}} ->
  98. {error, #{reason => "bad_plugin_config_status",
  99. hint => "disable_the_plugin_first"
  100. }};
  101. _ ->
  102. purge(NameVsn)
  103. end.
  104. %% @doc Ensure a plugin is enabled to the end of the plugins list.
  105. -spec ensure_enabled(name_vsn()) -> ok | {error, any()}.
  106. ensure_enabled(NameVsn) ->
  107. ensure_enabled(NameVsn, no_move).
  108. %% @doc Ensure a plugin is enabled at the given position of the plugin list.
  109. -spec ensure_enabled(name_vsn(), position()) -> ok | {error, any()}.
  110. ensure_enabled(NameVsn, Position) ->
  111. ensure_state(NameVsn, Position, true).
  112. %% @doc Ensure a plugin is disabled.
  113. -spec ensure_disabled(name_vsn()) -> ok | {error, any()}.
  114. ensure_disabled(NameVsn) ->
  115. ensure_state(NameVsn, no_move, false).
  116. ensure_state(NameVsn, Position, State) when is_binary(NameVsn) ->
  117. ensure_state(binary_to_list(NameVsn), Position, State);
  118. ensure_state(NameVsn, Position, State) ->
  119. case read_plugin(NameVsn) of
  120. {ok, _} ->
  121. Item = #{ name_vsn => NameVsn
  122. , enable => State
  123. },
  124. tryit("ensure_state", fun() -> ensure_configured(Item, Position) end);
  125. {error, Reason} ->
  126. {error, Reason}
  127. end.
  128. ensure_configured(#{name_vsn := NameVsn} = Item, Position) ->
  129. Configured = configured(),
  130. SplitFun = fun(#{name_vsn := Nv}) -> bin(Nv) =/= bin(NameVsn) end,
  131. {Front, Rear} = lists:splitwith(SplitFun, Configured),
  132. NewConfigured =
  133. case Rear of
  134. [_ | More] when Position =:= no_move ->
  135. Front ++ [Item | More];
  136. [_ | More] ->
  137. add_new_configured(Front ++ More, Position, Item);
  138. [] ->
  139. add_new_configured(Configured, Position, Item)
  140. end,
  141. ok = put_configured(NewConfigured).
  142. add_new_configured(Configured, no_move, Item) ->
  143. %% default to rear
  144. add_new_configured(Configured, rear, Item);
  145. add_new_configured(Configured, front, Item) ->
  146. [Item | Configured];
  147. add_new_configured(Configured, rear, Item) ->
  148. Configured ++ [Item];
  149. add_new_configured(Configured, {before, NameVsn}, Item) ->
  150. SplitFun = fun(#{name_vsn := Nv}) -> bin(Nv) =/= bin(NameVsn) end,
  151. {Front, Rear} = lists:splitwith(SplitFun, Configured),
  152. Rear =:= [] andalso
  153. throw(#{error => "position_anchor_plugin_not_configured",
  154. hint => "maybe_install_and_configure",
  155. name_vsn => NameVsn
  156. }),
  157. Front ++ [Item | Rear].
  158. %% @doc Delete the package file.
  159. -spec delete_package(name_vsn()) -> ok.
  160. delete_package(NameVsn) ->
  161. File = pkg_file(NameVsn),
  162. case file:delete(File) of
  163. ok ->
  164. ?SLOG(info, #{msg => "purged_plugin_dir", path => File}),
  165. ok;
  166. {error, enoent} ->
  167. ok;
  168. {error, Reason} ->
  169. ?SLOG(error, #{msg => "failed_to_delete_package_file",
  170. path => File,
  171. reason => Reason}),
  172. {error, Reason}
  173. end.
  174. %% @doc Delete extracted dir
  175. %% In case one lib is shared by multiple plugins.
  176. %% it might be the case that purging one plugin's install dir
  177. %% will cause deletion of loaded beams.
  178. %% It should not be a problem, because shared lib should
  179. %% reside in all the plugin install dirs.
  180. -spec purge(name_vsn()) -> ok.
  181. purge(NameVsn) ->
  182. Dir = dir(NameVsn),
  183. case file:del_dir_r(Dir) of
  184. ok ->
  185. ?SLOG(info, #{msg => "purged_plugin_dir", dir => Dir});
  186. {error, enoent} ->
  187. ok;
  188. {error, Reason} ->
  189. ?SLOG(error, #{msg => "failed_to_purge_plugin_dir",
  190. dir => Dir,
  191. reason => Reason}),
  192. {error, Reason}
  193. end.
  194. %% @doc Start all configured plugins are started.
  195. -spec ensure_started() -> ok.
  196. ensure_started() ->
  197. ok = for_plugins(fun ?MODULE:do_ensure_started/1).
  198. %% @doc Start a plugin from Management API or CLI.
  199. %% the input is a <name>-<vsn> string.
  200. -spec ensure_started(name_vsn()) -> ok | {error, term()}.
  201. ensure_started(NameVsn) ->
  202. case do_ensure_started(NameVsn) of
  203. ok -> ok;
  204. {error, Reason} ->
  205. ?SLOG(alert, #{msg => "failed_to_start_plugin",
  206. reason => Reason}),
  207. {error, Reason}
  208. end.
  209. %% @doc Stop all plugins before broker stops.
  210. -spec ensure_stopped() -> ok.
  211. ensure_stopped() ->
  212. for_plugins(fun ?MODULE:ensure_stopped/1).
  213. %% @doc Stop a plugin from Management API or CLI.
  214. -spec ensure_stopped(name_vsn()) -> ok | {error, term()}.
  215. ensure_stopped(NameVsn) ->
  216. tryit("stop_plugin",
  217. fun() ->
  218. Plugin = do_read_plugin(NameVsn),
  219. ensure_apps_stopped(Plugin)
  220. end).
  221. %% @doc Stop and then start the plugin.
  222. restart(NameVsn) ->
  223. case ensure_stopped(NameVsn) of
  224. ok -> ensure_started(NameVsn);
  225. {error, Reason} -> {error, Reason}
  226. end.
  227. %% @doc List all installed plugins.
  228. %% Including the ones that are installed, but not enabled in config.
  229. -spec list() -> [plugin()].
  230. list() ->
  231. Pattern = filename:join([install_dir(), "*", "release.json"]),
  232. All = lists:filtermap(
  233. fun(JsonFile) ->
  234. case read_plugin({file, JsonFile}) of
  235. {ok, Info} ->
  236. {true, Info};
  237. {error, Reason} ->
  238. ?SLOG(warning, Reason),
  239. false
  240. end
  241. end, filelib:wildcard(Pattern)),
  242. list(configured(), All).
  243. %% Make sure configured ones are ordered in front.
  244. list([], All) -> All;
  245. list([#{name_vsn := NameVsn} | Rest], All) ->
  246. SplitF = fun(#{<<"name">> := Name, <<"rel_vsn">> := Vsn}) ->
  247. bin([Name, "-", Vsn]) =/= bin(NameVsn)
  248. end,
  249. case lists:splitwith(SplitF, All) of
  250. {_, []} ->
  251. ?SLOG(warning, #{msg => "configured_plugin_not_installed",
  252. name_vsn => NameVsn
  253. }),
  254. list(Rest, All);
  255. {Front, [I | Rear]} ->
  256. [I | list(Rest, Front ++ Rear)]
  257. end.
  258. do_ensure_started(NameVsn) ->
  259. tryit("start_plugins",
  260. fun() ->
  261. Plugin = do_read_plugin(NameVsn),
  262. ok = load_code_start_apps(NameVsn, Plugin)
  263. end).
  264. %% try the function, catch 'throw' exceptions as normal 'error' return
  265. %% other exceptions with stacktrace returned.
  266. tryit(WhichOp, F) ->
  267. try
  268. F()
  269. catch
  270. throw : Reason ->
  271. %% thrown exceptions are known errors
  272. %% translate to a return value without stacktrace
  273. {error, Reason};
  274. error : Reason : Stacktrace ->
  275. %% unexpected errors, log stacktrace
  276. ?SLOG(warning, #{ msg => "plugin_op_failed"
  277. , which_op => WhichOp
  278. , exception => Reason
  279. , stacktrace => Stacktrace
  280. }),
  281. {error, {failed, WhichOp}}
  282. end.
  283. %% read plugin info from the JSON file
  284. %% returns {ok, Info} or {error, Reason}
  285. read_plugin(NameVsn) ->
  286. tryit("read_plugin_info",
  287. fun() -> {ok, do_read_plugin(NameVsn)} end).
  288. do_read_plugin({file, InfoFile}) ->
  289. [_, NameVsn | _] = lists:reverse(filename:split(InfoFile)),
  290. case hocon:load(InfoFile, #{format => richmap}) of
  291. {ok, RichMap} ->
  292. Info = check_plugin(hocon_util:richmap_to_map(RichMap), NameVsn, InfoFile),
  293. maps:merge(Info, plugin_status(NameVsn));
  294. {error, Reason} ->
  295. throw(#{error => "bad_info_file",
  296. path => InfoFile,
  297. return => Reason
  298. })
  299. end;
  300. do_read_plugin(NameVsn) ->
  301. do_read_plugin({file, info_file(NameVsn)}).
  302. plugin_status(NameVsn) ->
  303. {AppName, _AppVsn} = parse_name_vsn(NameVsn),
  304. RunningSt =
  305. case application:get_key(AppName, vsn) of
  306. {ok, _} ->
  307. case lists:keyfind(AppName, 1, running_apps()) of
  308. {AppName, _} -> running;
  309. _ -> loaded
  310. end;
  311. undefined ->
  312. stopped
  313. end,
  314. Configured = lists:filtermap(
  315. fun(#{name_vsn := Nv, enable := St}) ->
  316. case bin(Nv) =:= bin(NameVsn) of
  317. true -> {true, St};
  318. false -> false
  319. end
  320. end, configured()),
  321. ConfSt = case Configured of
  322. [] -> not_configured;
  323. [true] -> enabled;
  324. [false] -> disabled
  325. end,
  326. #{ running_status => RunningSt
  327. , config_status => ConfSt
  328. }.
  329. bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
  330. bin(L) when is_list(L) -> unicode:characters_to_binary(L, utf8);
  331. bin(B) when is_binary(B) -> B.
  332. check_plugin(#{ <<"name">> := Name
  333. , <<"rel_vsn">> := Vsn
  334. , <<"rel_apps">> := Apps
  335. , <<"description">> := _
  336. } = Info, NameVsn, File) ->
  337. case bin(NameVsn) =:= bin([Name, "-", Vsn]) of
  338. true ->
  339. try
  340. [_ | _ ] = Apps, %% assert
  341. %% validate if the list is all <app>-<vsn> strings
  342. lists:foreach(fun parse_name_vsn/1, Apps)
  343. catch
  344. _ : _ ->
  345. throw(#{ error => "bad_rel_apps"
  346. , rel_apps => Apps
  347. , hint => "A non-empty string list of app_name-app_vsn format"
  348. })
  349. end,
  350. Info;
  351. false ->
  352. throw(#{ error => "name_vsn_mismatch"
  353. , name_vsn => NameVsn
  354. , path => File
  355. , name => Name
  356. , rel_vsn => Vsn
  357. })
  358. end;
  359. check_plugin(_What, NameVsn, File) ->
  360. throw(#{ error => "bad_info_file_content"
  361. , mandatory_fields => [rel_vsn, name, rel_apps, description]
  362. , name_vsn => NameVsn
  363. , path => File
  364. }).
  365. load_code_start_apps(RelNameVsn, #{<<"rel_apps">> := Apps}) ->
  366. LibDir = filename:join([install_dir(), RelNameVsn]),
  367. RunningApps = running_apps(),
  368. %% load plugin apps and beam code
  369. AppNames =
  370. lists:map(fun(AppNameVsn) ->
  371. {AppName, AppVsn} = parse_name_vsn(AppNameVsn),
  372. EbinDir = filename:join([LibDir, AppNameVsn, "ebin"]),
  373. ok = load_plugin_app(AppName, AppVsn, EbinDir, RunningApps),
  374. AppName
  375. end, Apps),
  376. lists:foreach(fun start_app/1, AppNames).
  377. load_plugin_app(AppName, AppVsn, Ebin, RunningApps) ->
  378. case lists:keyfind(AppName, 1, RunningApps) of
  379. false -> do_load_plugin_app(AppName, Ebin);
  380. {_, Vsn} ->
  381. case bin(Vsn) =:= bin(AppVsn) of
  382. true ->
  383. %% already started on the exact versio
  384. ok;
  385. false ->
  386. %% running but a different version
  387. ?SLOG(warning, #{msg => "plugin_app_already_running", name => AppName,
  388. running_vsn => Vsn,
  389. loading_vsn => AppVsn
  390. })
  391. end
  392. end.
  393. do_load_plugin_app(AppName, Ebin) when is_binary(Ebin) ->
  394. do_load_plugin_app(AppName, binary_to_list(Ebin));
  395. do_load_plugin_app(AppName, Ebin) ->
  396. _ = code:add_patha(Ebin),
  397. Modules = filelib:wildcard(filename:join([Ebin, "*.beam"])),
  398. lists:foreach(
  399. fun(BeamFile) ->
  400. Module = list_to_atom(filename:basename(BeamFile, ".beam")),
  401. case code:load_file(Module) of
  402. {module, _} -> ok;
  403. {error, Reason} -> throw(#{error => "failed_to_load_plugin_beam",
  404. path => BeamFile,
  405. reason => Reason
  406. })
  407. end
  408. end, Modules),
  409. case application:load(AppName) of
  410. ok -> ok;
  411. {error, {already_loaded, _}} -> ok;
  412. {error, Reason} -> throw(#{error => "failed_to_load_plugin_app",
  413. name => AppName,
  414. reason => Reason})
  415. end.
  416. start_app(App) ->
  417. case application:ensure_all_started(App) of
  418. {ok, Started} ->
  419. case Started =/= [] of
  420. true -> ?SLOG(debug, #{msg => "started_plugin_apps", apps => Started});
  421. false -> ok
  422. end,
  423. ?SLOG(debug, #{msg => "started_plugin_app", app => App}),
  424. ok;
  425. {error, {ErrApp, Reason}} ->
  426. throw(#{error => "failed_to_start_plugin_app",
  427. app => App,
  428. err_app => ErrApp,
  429. reason => Reason
  430. })
  431. end.
  432. %% Stop all apps installed by the plugin package,
  433. %% but not the ones shared with others.
  434. ensure_apps_stopped(#{<<"rel_apps">> := Apps}) ->
  435. %% load plugin apps and beam code
  436. AppsToStop =
  437. lists:map(fun(NameVsn) ->
  438. {AppName, _AppVsn} = parse_name_vsn(NameVsn),
  439. AppName
  440. end, Apps),
  441. case tryit("stop_apps", fun() -> stop_apps(AppsToStop) end) of
  442. {ok, []} ->
  443. %% all apps stopped
  444. ok;
  445. {ok, Left} ->
  446. ?SLOG(warning, #{msg => "unabled_to_stop_plugin_apps",
  447. apps => Left
  448. }),
  449. ok;
  450. {error, Reason} ->
  451. {error, Reason}
  452. end.
  453. stop_apps(Apps) ->
  454. RunningApps = running_apps(),
  455. case do_stop_apps(Apps, [], RunningApps) of
  456. {ok, []} -> {ok, []}; %% all stopped
  457. {ok, Remain} when Remain =:= Apps -> {ok, Apps}; %% no progress
  458. {ok, Remain} -> stop_apps(Remain) %% try again
  459. end.
  460. do_stop_apps([], Remain, _AllApps) ->
  461. {ok, lists:reverse(Remain)};
  462. do_stop_apps([App | Apps], Remain, RunningApps) ->
  463. case is_needed_by_any(App, RunningApps) of
  464. true ->
  465. do_stop_apps(Apps, [App | Remain], RunningApps);
  466. false ->
  467. ok = stop_app(App),
  468. do_stop_apps(Apps, Remain, RunningApps)
  469. end.
  470. stop_app(App) ->
  471. case application:stop(App) of
  472. ok ->
  473. ?SLOG(debug, #{msg => "stop_plugin_successfully", app => App}),
  474. ok = unload_moudle_and_app(App);
  475. {error, {not_started, App}} ->
  476. ?SLOG(debug, #{msg => "plugin_not_started", app => App}),
  477. ok = unload_moudle_and_app(App);
  478. {error, Reason} ->
  479. throw(#{error => "failed_to_stop_app", app => App, reason => Reason})
  480. end.
  481. unload_moudle_and_app(App) ->
  482. case application:get_key(App, modules) of
  483. {ok, Modules} -> lists:foreach(fun code:soft_purge/1, Modules);
  484. _ -> ok
  485. end,
  486. _ = application:unload(App),
  487. ok.
  488. is_needed_by_any(AppToStop, RunningApps) ->
  489. lists:any(fun({RunningApp, _RunningAppVsn}) ->
  490. is_needed_by(AppToStop, RunningApp)
  491. end, RunningApps).
  492. is_needed_by(AppToStop, AppToStop) -> false;
  493. is_needed_by(AppToStop, RunningApp) ->
  494. case application:get_key(RunningApp, applications) of
  495. {ok, Deps} -> lists:member(AppToStop, Deps);
  496. undefined -> false
  497. end.
  498. put_config(Key, Value) when is_atom(Key) ->
  499. put_config([Key], Value);
  500. put_config(Path, Value) when is_list(Path) ->
  501. emqx_config:put([?CONF_ROOT | Path], Value).
  502. get_config(Key, Default) when is_atom(Key) ->
  503. get_config([Key], Default);
  504. get_config(Path, Default) ->
  505. emqx:get_config([?CONF_ROOT | Path], Default).
  506. install_dir() -> get_config(install_dir, "").
  507. put_configured(Configured) ->
  508. ok = put_config(states, Configured).
  509. configured() ->
  510. get_config(states, []).
  511. for_plugins(ActionFun) ->
  512. case lists:flatmap(fun(I) -> for_plugin(I, ActionFun) end, configured()) of
  513. [] -> ok;
  514. Errors -> erlang:error(#{function => ActionFun, errors => Errors})
  515. end.
  516. for_plugin(#{name_vsn := NameVsn, enable := true}, Fun) ->
  517. case Fun(NameVsn) of
  518. ok -> [];
  519. {error, Reason} -> [{NameVsn, Reason}]
  520. end;
  521. for_plugin(#{name_vsn := NameVsn, enable := false}, _Fun) ->
  522. ?SLOG(debug, #{msg => "plugin_disabled",
  523. name_vsn => NameVsn}),
  524. [].
  525. parse_name_vsn(NameVsn) when is_binary(NameVsn) ->
  526. parse_name_vsn(binary_to_list(NameVsn));
  527. parse_name_vsn(NameVsn) when is_list(NameVsn) ->
  528. {AppName, [$- | Vsn]} = lists:splitwith(fun(X) -> X =/= $- end, NameVsn),
  529. {list_to_atom(AppName), Vsn}.
  530. pkg_file(NameVsn) ->
  531. filename:join([install_dir(), bin([NameVsn, ".tar.gz"])]).
  532. dir(NameVsn) ->
  533. filename:join([install_dir(), NameVsn]).
  534. info_file(NameVsn) ->
  535. filename:join([dir(NameVsn), "release.json"]).
  536. running_apps() ->
  537. lists:map(fun({N, _, V}) ->
  538. {N, V}
  539. end, application:which_applications(infinity)).