emqx_plugins.erl 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2017-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_plugins).
  17. -include("emqx.hrl").
  18. -include("logger.hrl").
  19. -logger_header("[Plugins]").
  20. -export([init/0]).
  21. -export([ load/0
  22. , load/1
  23. , unload/0
  24. , unload/1
  25. , reload/1
  26. , list/0
  27. , find_plugin/1
  28. , generate_configs/1
  29. , apply_configs/1
  30. ]).
  31. -export([funlog/2]).
  32. -ifdef(TEST).
  33. -compile(export_all).
  34. -compile(nowarn_export_all).
  35. -endif.
  36. -dialyzer({no_match, [ plugin_loaded/2
  37. , plugin_unloaded/2
  38. ]}).
  39. %%--------------------------------------------------------------------
  40. %% APIs
  41. %%--------------------------------------------------------------------
  42. %% @doc Init plugins' config
  43. -spec(init() -> ok).
  44. init() ->
  45. case emqx:get_env(plugins_etc_dir) of
  46. undefined -> ok;
  47. PluginsEtc ->
  48. CfgFiles = [filename:join(PluginsEtc, File) ||
  49. File <- filelib:wildcard("*.config", PluginsEtc)],
  50. lists:foreach(fun init_config/1, CfgFiles)
  51. end.
  52. %% @doc Load all plugins when the broker started.
  53. -spec(load() -> ok | ignore | {error, term()}).
  54. load() ->
  55. ok = load_ext_plugins(emqx:get_env(expand_plugins_dir)),
  56. case emqx:get_env(plugins_loaded_file) of
  57. undefined -> ignore; %% No plugins available
  58. File ->
  59. _ = ensure_file(File),
  60. with_loaded_file(File, fun(Names) -> load_plugins(Names, false) end)
  61. end.
  62. %% @doc Load a Plugin
  63. -spec(load(atom()) -> ok | {error, term()}).
  64. load(PluginName) when is_atom(PluginName) ->
  65. case {lists:member(PluginName, names(plugin)), lists:member(PluginName, names(started_app))} of
  66. {false, _} ->
  67. ?LOG(alert, "Plugin ~s not found, cannot load it", [PluginName]),
  68. {error, not_found};
  69. {_, true} ->
  70. ?LOG(notice, "Plugin ~s is already started", [PluginName]),
  71. {error, already_started};
  72. {_, false} ->
  73. load_plugin(PluginName, true)
  74. end.
  75. %% @doc Unload all plugins before broker stopped.
  76. -spec(unload() -> list() | {error, term()}).
  77. unload() ->
  78. case emqx:get_env(plugins_loaded_file) of
  79. undefined -> ignore;
  80. File ->
  81. with_loaded_file(File, fun stop_plugins/1)
  82. end.
  83. %% @doc UnLoad a Plugin
  84. -spec(unload(atom()) -> ok | {error, term()}).
  85. unload(PluginName) when is_atom(PluginName) ->
  86. case {lists:member(PluginName, names(plugin)), lists:member(PluginName, names(started_app))} of
  87. {false, _} ->
  88. ?LOG(error, "Plugin ~s is not found, cannot unload it", [PluginName]),
  89. {error, not_found};
  90. {_, false} ->
  91. ?LOG(error, "Plugin ~s is not started", [PluginName]),
  92. {error, not_started};
  93. {_, _} ->
  94. unload_plugin(PluginName, true)
  95. end.
  96. reload(PluginName) when is_atom(PluginName)->
  97. case {lists:member(PluginName, names(plugin)), lists:member(PluginName, names(started_app))} of
  98. {false, _} ->
  99. ?LOG(error, "Plugin ~s is not found, cannot reload it", [PluginName]),
  100. {error, not_found};
  101. {_, false} ->
  102. load(PluginName);
  103. {_, true} ->
  104. case unload(PluginName) of
  105. ok -> load(PluginName);
  106. {error, Reason} -> {error, Reason}
  107. end
  108. end.
  109. %% @doc List all available plugins
  110. -spec(list() -> [emqx_types:plugin()]).
  111. list() ->
  112. StartedApps = names(started_app),
  113. lists:map(fun({Name, _, [Type| _]}) ->
  114. Plugin = plugin(Name, Type),
  115. case lists:member(Name, StartedApps) of
  116. true -> Plugin#plugin{active = true};
  117. false -> Plugin
  118. end
  119. end, lists:sort(ekka_boot:all_module_attributes(emqx_plugin))).
  120. find_plugin(Name) ->
  121. find_plugin(Name, list()).
  122. find_plugin(Name, Plugins) ->
  123. lists:keyfind(Name, 2, Plugins).
  124. %%--------------------------------------------------------------------
  125. %% Internal functions
  126. %%--------------------------------------------------------------------
  127. init_config(CfgFile) ->
  128. {ok, [AppsEnv]} = file:consult(CfgFile),
  129. lists:foreach(fun({App, Envs}) ->
  130. [application:set_env(App, Par, Val) || {Par, Val} <- Envs]
  131. end, AppsEnv).
  132. %% load external plugins which are placed in etc/plugins dir
  133. load_ext_plugins(undefined) -> ok;
  134. load_ext_plugins(Dir) ->
  135. lists:foreach(
  136. fun(Plugin) ->
  137. PluginDir = filename:join(Dir, Plugin),
  138. case filelib:is_dir(PluginDir) of
  139. true -> load_ext_plugin(PluginDir);
  140. false -> ok
  141. end
  142. end, filelib:wildcard("*", Dir)).
  143. load_ext_plugin(PluginDir) ->
  144. ?LOG(debug, "loading_extra_plugin: ~s", [PluginDir]),
  145. Ebin = filename:join([PluginDir, "ebin"]),
  146. AppFile = filename:join([Ebin, "*.app"]),
  147. AppName = case filelib:wildcard(AppFile) of
  148. [App] ->
  149. list_to_atom(filename:basename(App, ".app"));
  150. [] ->
  151. ?LOG(alert, "plugin_app_file_not_found: ~s", [AppFile]),
  152. error({plugin_app_file_not_found, AppFile})
  153. end,
  154. ok = load_plugin_app(AppName, Ebin),
  155. try
  156. ok = load_plugin_conf(AppName, PluginDir)
  157. catch
  158. throw : {conf_file_not_found, ConfFile} ->
  159. %% this is maybe a dependency of an external plugin
  160. ?LOG(debug, "config_load_error_ignored for app=~p, path=~s", [AppName, ConfFile]),
  161. ok
  162. end.
  163. load_plugin_app(AppName, Ebin) ->
  164. _ = code:add_patha(Ebin),
  165. Modules = filelib:wildcard(filename:join([Ebin, "*.beam"])),
  166. lists:foreach(
  167. fun(BeamFile) ->
  168. Module = list_to_atom(filename:basename(BeamFile, ".beam")),
  169. case code:load_file(Module) of
  170. {module, _} -> ok;
  171. {error, Reason} -> error({failed_to_load_plugin_beam, BeamFile, Reason})
  172. end
  173. end, Modules),
  174. case application:load(AppName) of
  175. ok -> ok;
  176. {error, {already_loaded, _}} -> ok
  177. end.
  178. load_plugin_conf(AppName, PluginDir) ->
  179. Priv = filename:join([PluginDir, "priv"]),
  180. Etc = filename:join([PluginDir, "etc"]),
  181. ConfFile = filename:join([Etc, atom_to_list(AppName) ++ ".conf"]),
  182. Conf = case filelib:is_file(ConfFile) of
  183. true -> cuttlefish_conf:file(ConfFile);
  184. false -> throw({conf_file_not_found, ConfFile})
  185. end,
  186. Schema = filelib:wildcard(filename:join([Priv, "*.schema"])),
  187. ?LOG(debug, "loading_extra_plugin_config conf=~s, schema=~s", [ConfFile, Schema]),
  188. AppsEnv = cuttlefish_generator:map(cuttlefish_schema:files(Schema), Conf),
  189. lists:foreach(fun({AppName1, Envs}) ->
  190. [application:set_env(AppName1, Par, Val) || {Par, Val} <- Envs]
  191. end, AppsEnv).
  192. ensure_file(File) ->
  193. case filelib:is_file(File) of false -> write_loaded([]); true -> ok end.
  194. with_loaded_file(File, SuccFun) ->
  195. case read_loaded(File) of
  196. {ok, Names0} ->
  197. Names = filter_plugins(Names0),
  198. SuccFun(Names);
  199. {error, Error} ->
  200. ?LOG(alert, "Failed to read: ~p, error: ~p", [File, Error]),
  201. {error, Error}
  202. end.
  203. filter_plugins(Names) ->
  204. lists:filtermap(fun(Name1) when is_atom(Name1) -> {true, Name1};
  205. ({Name1, true}) -> {true, Name1};
  206. ({_Name1, false}) -> false
  207. end, Names).
  208. load_plugins(Names, Persistent) ->
  209. Plugins = list(),
  210. NotFound = Names -- names(Plugins),
  211. case NotFound of
  212. [] -> ok;
  213. NotFound -> ?LOG(alert, "cannot_find_plugins: ~p", [NotFound])
  214. end,
  215. NeedToLoad = Names -- NotFound -- names(started_app),
  216. lists:foreach(fun(Name) ->
  217. Plugin = find_plugin(Name, Plugins),
  218. load_plugin(Plugin#plugin.name, Persistent)
  219. end, NeedToLoad).
  220. generate_configs(App) ->
  221. ConfigFile = filename:join([emqx:get_env(plugins_etc_dir), App]) ++ ".config",
  222. case filelib:is_file(ConfigFile) of
  223. true ->
  224. {ok, [Configs]} = file:consult(ConfigFile),
  225. Configs;
  226. false ->
  227. do_generate_configs(App)
  228. end.
  229. do_generate_configs(App) ->
  230. Name1 = filename:join([emqx:get_env(plugins_etc_dir), App]) ++ ".conf",
  231. Name2 = filename:join([code:lib_dir(App), "etc", App]) ++ ".conf",
  232. ConfFile = case {filelib:is_file(Name1), filelib:is_file(Name2)} of
  233. {true, _} -> Name1;
  234. {false, true} -> Name2;
  235. {false, false} -> error({config_not_found, [Name1, Name2]})
  236. end,
  237. SchemaFile = filename:join([code:priv_dir(App), App]) ++ ".schema",
  238. case filelib:is_file(SchemaFile) of
  239. true ->
  240. Schema = cuttlefish_schema:files([SchemaFile]),
  241. Conf = cuttlefish_conf:file(ConfFile),
  242. cuttlefish_generator:map(Schema, Conf, undefined, fun ?MODULE:funlog/2);
  243. false ->
  244. error({schema_not_found, SchemaFile})
  245. end.
  246. apply_configs([]) ->
  247. ok;
  248. apply_configs([{App, Config} | More]) ->
  249. lists:foreach(fun({Key, _}) -> application:unset_env(App, Key) end, application:get_all_env(App)),
  250. lists:foreach(fun({Key, Val}) -> application:set_env(App, Key, Val) end, Config),
  251. apply_configs(More).
  252. %% Stop plugins
  253. stop_plugins(Names) ->
  254. _ = [stop_app(App) || App <- Names],
  255. ok.
  256. plugin(AppName, Type) ->
  257. case application:get_all_key(AppName) of
  258. {ok, Attrs} ->
  259. Descr = proplists:get_value(description, Attrs, ""),
  260. #plugin{name = AppName, descr = Descr, type = plugin_type(Type)};
  261. undefined -> error({plugin_not_found, AppName})
  262. end.
  263. load_plugin(Name, Persistent) ->
  264. try
  265. Configs = ?MODULE:generate_configs(Name),
  266. ?MODULE:apply_configs(Configs),
  267. case load_app(Name) of
  268. ok ->
  269. start_app(Name, fun(App) -> plugin_loaded(App, Persistent) end);
  270. {error, Error0} ->
  271. {error, Error0}
  272. end
  273. catch _ : Error : Stacktrace ->
  274. ?LOG(alert, "Plugin ~s load failed with ~p", [Name, {Error, Stacktrace}]),
  275. {error, parse_config_file_failed}
  276. end.
  277. load_app(App) ->
  278. case application:load(App) of
  279. ok ->
  280. ok;
  281. {error, {already_loaded, App}} ->
  282. ok;
  283. {error, Error} ->
  284. {error, Error}
  285. end.
  286. start_app(App, SuccFun) ->
  287. case application:ensure_all_started(App) of
  288. {ok, Started} ->
  289. ?LOG(info, "Started plugins: ~p", [Started]),
  290. ?LOG(info, "Load plugin ~s successfully", [App]),
  291. _ = SuccFun(App),
  292. ok;
  293. {error, {ErrApp, Reason}} ->
  294. ?LOG(error, "Load plugin ~s failed, cannot start plugin ~s for ~0p", [App, ErrApp, Reason]),
  295. {error, {ErrApp, Reason}}
  296. end.
  297. unload_plugin(App, Persistent) ->
  298. case stop_app(App) of
  299. ok ->
  300. _ = plugin_unloaded(App, Persistent), ok;
  301. {error, Reason} ->
  302. {error, Reason}
  303. end.
  304. stop_app(App) ->
  305. case application:stop(App) of
  306. ok ->
  307. ?LOG(info, "Stop plugin ~s successfully", [App]), ok;
  308. {error, {not_started, App}} ->
  309. ?LOG(error, "Plugin ~s is not started", [App]), ok;
  310. {error, Reason} ->
  311. ?LOG(error, "Stop plugin ~s error: ~p", [App]), {error, Reason}
  312. end.
  313. names(plugin) ->
  314. names(list());
  315. names(started_app) ->
  316. [Name || {Name, _Descr, _Ver} <- application:which_applications()];
  317. names(Plugins) ->
  318. [Name || #plugin{name = Name} <- Plugins].
  319. plugin_loaded(_Name, false) ->
  320. ok;
  321. plugin_loaded(Name, true) ->
  322. case read_loaded() of
  323. {ok, Names} ->
  324. case lists:member(Name, Names) of
  325. false ->
  326. %% write file if plugin is loaded
  327. write_loaded(lists:append(Names, [{Name, true}]));
  328. true ->
  329. ignore
  330. end;
  331. {error, Error} ->
  332. ?LOG(error, "Cannot read loaded plugins: ~p", [Error])
  333. end.
  334. plugin_unloaded(_Name, false) ->
  335. ok;
  336. plugin_unloaded(Name, true) ->
  337. case read_loaded() of
  338. {ok, Names0} ->
  339. Names = filter_plugins(Names0),
  340. case lists:member(Name, Names) of
  341. true ->
  342. write_loaded(lists:delete(Name, Names));
  343. false ->
  344. ?LOG(error, "Cannot find ~s in loaded_file", [Name])
  345. end;
  346. {error, Error} ->
  347. ?LOG(error, "Cannot read loaded_plugins: ~p", [Error])
  348. end.
  349. read_loaded() ->
  350. case emqx:get_env(plugins_loaded_file) of
  351. undefined -> {error, not_found};
  352. File -> read_loaded(File)
  353. end.
  354. read_loaded(File) -> file:consult(File).
  355. write_loaded(AppNames) ->
  356. FilePath = emqx:get_env(plugins_loaded_file),
  357. case file:write_file(FilePath, [io_lib:format("~p.~n", [Name]) || Name <- AppNames]) of
  358. ok -> ok;
  359. {error, Error} ->
  360. ?LOG(error, "Write File ~p Error: ~p", [FilePath, Error]),
  361. {error, Error}
  362. end.
  363. plugin_type(auth) -> auth;
  364. plugin_type(protocol) -> protocol;
  365. plugin_type(backend) -> backend;
  366. plugin_type(bridge) -> bridge;
  367. plugin_type(_) -> feature.
  368. funlog(Key, Value) ->
  369. ?LOG(info, "~s = ~p", [string:join(Key, "."), Value]).