emqx_plugins.erl 14 KB


  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2017-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_plugins).
  17. -include("emqx.hrl").
  18. -include("logger.hrl").
  19. -logger_header("[Plugins]").
  20. -export([init/0]).
  21. -export([ load/0
  22. , load/1
  23. , unload/0
  24. , unload/1
  25. , reload/1
  26. , list/0
  27. , find_plugin/1
  28. , generate_configs/1
  29. , apply_configs/1
  30. ]).
  31. -ifdef(TEST).
  32. -compile(export_all).
  33. -compile(nowarn_export_all).
  34. -endif.
  35. -dialyzer({no_match, [ plugin_loaded/2
  36. , plugin_unloaded/2
  37. ]}).
  38. %%--------------------------------------------------------------------
  39. %% APIs
  40. %%--------------------------------------------------------------------
  41. %% @doc Init plugins' config
  42. -spec(init() -> ok).
  43. init() ->
  44. case emqx:get_env(plugins_etc_dir) of
  45. undefined -> ok;
  46. PluginsEtc ->
  47. CfgFiles = [filename:join(PluginsEtc, File) ||
  48. File <- filelib:wildcard("*.config", PluginsEtc)],
  49. lists:foreach(fun init_config/1, CfgFiles)
  50. end.
  51. %% @doc Load all plugins when the broker started.
  52. -spec(load() -> ok | ignore | {error, term()}).
  53. load() ->
  54. ok = load_ext_plugins(emqx:get_env(expand_plugins_dir)),
  55. case emqx:get_env(plugins_loaded_file) of
  56. undefined -> ignore; %% No plugins available
  57. File ->
  58. _ = ensure_file(File),
  59. with_loaded_file(File, fun(Names) -> load_plugins(Names, false) end)
  60. end.
  61. %% @doc Load a Plugin
  62. -spec(load(atom()) -> ok | {error, term()}).
  63. load(PluginName) when is_atom(PluginName) ->
  64. case {lists:member(PluginName, names(plugin)), lists:member(PluginName, names(started_app))} of
  65. {false, _} ->
  66. ?LOG(alert, "Plugin ~s not found, cannot load it", [PluginName]),
  67. {error, not_found};
  68. {_, true} ->
  69. ?LOG(notice, "Plugin ~s is already started", [PluginName]),
  70. {error, already_started};
  71. {_, false} ->
  72. load_plugin(PluginName, true)
  73. end.
  74. %% @doc Unload all plugins before broker stopped.
  75. -spec(unload() -> list() | {error, term()}).
  76. unload() ->
  77. case emqx:get_env(plugins_loaded_file) of
  78. undefined -> ignore;
  79. File ->
  80. with_loaded_file(File, fun stop_plugins/1)
  81. end.
  82. %% @doc UnLoad a Plugin
  83. -spec(unload(atom()) -> ok | {error, term()}).
  84. unload(PluginName) when is_atom(PluginName) ->
  85. case {lists:member(PluginName, names(plugin)), lists:member(PluginName, names(started_app))} of
  86. {false, _} ->
  87. ?LOG(error, "Plugin ~s is not found, cannot unload it", [PluginName]),
  88. {error, not_found};
  89. {_, false} ->
  90. ?LOG(error, "Plugin ~s is not started", [PluginName]),
  91. {error, not_started};
  92. {_, _} ->
  93. unload_plugin(PluginName, true)
  94. end.
  95. reload(PluginName) when is_atom(PluginName)->
  96. case {lists:member(PluginName, names(plugin)), lists:member(PluginName, names(started_app))} of
  97. {false, _} ->
  98. ?LOG(error, "Plugin ~s is not found, cannot reload it", [PluginName]),
  99. {error, not_found};
  100. {_, false} ->
  101. load(PluginName);
  102. {_, true} ->
  103. case unload(PluginName) of
  104. ok -> load(PluginName);
  105. {error, Reason} -> {error, Reason}
  106. end
  107. end.
  108. %% @doc List all available plugins
  109. -spec(list() -> [emqx_types:plugin()]).
  110. list() ->
  111. StartedApps = names(started_app),
  112. lists:map(fun({Name, _, [Type| _]}) ->
  113. Plugin = plugin(Name, Type),
  114. case lists:member(Name, StartedApps) of
  115. true -> Plugin#plugin{active = true};
  116. false -> Plugin
  117. end
  118. end, lists:sort(ekka_boot:all_module_attributes(emqx_plugin))).
  119. find_plugin(Name) ->
  120. find_plugin(Name, list()).
  121. find_plugin(Name, Plugins) ->
  122. lists:keyfind(Name, 2, Plugins).
  123. %%--------------------------------------------------------------------
  124. %% Internal functions
  125. %%--------------------------------------------------------------------
  126. init_config(CfgFile) ->
  127. {ok, [AppsEnv]} = file:consult(CfgFile),
  128. lists:foreach(fun({App, Envs}) ->
  129. [application:set_env(App, Par, Val) || {Par, Val} <- Envs]
  130. end, AppsEnv).
  131. %% load external plugins which are placed in etc/plugins dir
  132. load_ext_plugins(undefined) -> ok;
  133. load_ext_plugins(Dir) ->
  134. lists:foreach(
  135. fun(Plugin) ->
  136. PluginDir = filename:join(Dir, Plugin),
  137. case filelib:is_dir(PluginDir) of
  138. true -> load_ext_plugin(PluginDir);
  139. false -> ok
  140. end
  141. end, filelib:wildcard("*", Dir)).
  142. load_ext_plugin(PluginDir) ->
  143. ?LOG(debug, "loading_extra_plugin: ~s", [PluginDir]),
  144. Ebin = filename:join([PluginDir, "ebin"]),
  145. AppFile = filename:join([Ebin, "*.app"]),
  146. AppName = case filelib:wildcard(AppFile) of
  147. [App] ->
  148. list_to_atom(filename:basename(App, ".app"));
  149. [] ->
  150. ?LOG(alert, "plugin_app_file_not_found: ~s", [AppFile]),
  151. error({plugin_app_file_not_found, AppFile})
  152. end,
  153. ok = load_plugin_app(AppName, Ebin),
  154. ok = load_plugin_conf(AppName, PluginDir).
  155. load_plugin_app(AppName, Ebin) ->
  156. _ = code:add_patha(Ebin),
  157. Modules = filelib:wildcard(filename:join([Ebin, "*.beam"])),
  158. lists:foreach(
  159. fun(BeamFile) ->
  160. Module = list_to_atom(filename:basename(BeamFile, ".beam")),
  161. case code:ensure_loaded(Module) of
  162. {module, Module} -> ok;
  163. {error, Reason} -> error({failed_to_load_plugin_beam, BeamFile, Reason})
  164. end
  165. end, Modules),
  166. case application:load(AppName) of
  167. ok -> ok;
  168. {error, {already_loaded, _}} -> ok
  169. end.
  170. load_plugin_conf(AppName, PluginDir) ->
  171. Priv = filename:join([PluginDir, "priv"]),
  172. Etc = filename:join([PluginDir, "etc"]),
  173. Schema = filelib:wildcard(filename:join([Priv, "*.schema"])),
  174. ConfFile = filename:join([Etc, atom_to_list(AppName) ++ ".conf"]),
  175. Conf = case filelib:is_file(ConfFile) of
  176. true -> cuttlefish_conf:file(ConfFile);
  177. false -> error({conf_file_not_found, ConfFile})
  178. end,
  179. ?LOG(debug, "loading_extra_plugin_config conf=~s, schema=~s", [ConfFile, Schema]),
  180. AppsEnv = cuttlefish_generator:map(cuttlefish_schema:files(Schema), Conf),
  181. lists:foreach(fun({AppName1, Envs}) ->
  182. [application:set_env(AppName1, Par, Val) || {Par, Val} <- Envs]
  183. end, AppsEnv).
  184. ensure_file(File) ->
  185. case filelib:is_file(File) of false -> write_loaded([]); true -> ok end.
  186. with_loaded_file(File, SuccFun) ->
  187. case read_loaded(File) of
  188. {ok, Names0} ->
  189. Names = filter_plugins(Names0),
  190. SuccFun(Names);
  191. {error, Error} ->
  192. ?LOG(alert, "Failed to read: ~p, error: ~p", [File, Error]),
  193. {error, Error}
  194. end.
  195. filter_plugins(Names) ->
  196. lists:filtermap(fun(Name1) when is_atom(Name1) -> {true, Name1};
  197. ({Name1, true}) -> {true, Name1};
  198. ({_Name1, false}) -> false
  199. end, Names).
  200. load_plugins(Names, Persistent) ->
  201. Plugins = list(),
  202. NotFound = Names -- names(Plugins),
  203. case NotFound of
  204. [] -> ok;
  205. NotFound -> ?LOG(alert, "cannot_find_plugins: ~p", [NotFound])
  206. end,
  207. NeedToLoad = Names -- NotFound -- names(started_app),
  208. lists:foreach(fun(Name) ->
  209. Plugin = find_plugin(Name, Plugins),
  210. load_plugin(Plugin#plugin.name, Persistent)
  211. end, NeedToLoad).
  212. generate_configs(App) ->
  213. ConfigFile = filename:join([emqx:get_env(plugins_etc_dir), App]) ++ ".config",
  214. case filelib:is_file(ConfigFile) of
  215. true ->
  216. {ok, [Configs]} = file:consult(ConfigFile),
  217. Configs;
  218. false ->
  219. do_generate_configs(App)
  220. end.
  221. do_generate_configs(App) ->
  222. Name1 = filename:join([emqx:get_env(plugins_etc_dir), App]) ++ ".conf",
  223. Name2 = filename:join([code:lib_dir(App), "etc", App]) ++ ".conf",
  224. ConfFile = case {filelib:is_file(Name1), filelib:is_file(Name2)} of
  225. {true, _} -> Name1;
  226. {false, true} -> Name2;
  227. {false, false} -> error({config_not_found, [Name1, Name2]})
  228. end,
  229. SchemaFile = filename:join([code:priv_dir(App), App]) ++ ".schema",
  230. case filelib:is_file(SchemaFile) of
  231. true ->
  232. Schema = cuttlefish_schema:files([SchemaFile]),
  233. Conf = cuttlefish_conf:file(ConfFile),
  234. LogFun = fun(Key, Value) -> ?LOG(info, "~s = ~p", [string:join(Key, "."), Value]) end,
  235. cuttlefish_generator:map(Schema, Conf, undefined, LogFun);
  236. false ->
  237. error({schema_not_found, SchemaFile})
  238. end.
  239. apply_configs([]) ->
  240. ok;
  241. apply_configs([{App, Config} | More]) ->
  242. lists:foreach(fun({Key, _}) -> application:unset_env(App, Key) end, application:get_all_env(App)),
  243. lists:foreach(fun({Key, Val}) -> application:set_env(App, Key, Val) end, Config),
  244. apply_configs(More).
  245. %% Stop plugins
  246. stop_plugins(Names) ->
  247. _ = [stop_app(App) || App <- Names],
  248. ok.
  249. plugin(AppName, Type) ->
  250. case application:get_all_key(AppName) of
  251. {ok, Attrs} ->
  252. Descr = proplists:get_value(description, Attrs, ""),
  253. #plugin{name = AppName, descr = Descr, type = plugin_type(Type)};
  254. undefined -> error({plugin_not_found, AppName})
  255. end.
  256. load_plugin(Name, Persistent) ->
  257. try
  258. Configs = ?MODULE:generate_configs(Name),
  259. ?MODULE:apply_configs(Configs),
  260. case load_app(Name) of
  261. ok ->
  262. start_app(Name, fun(App) -> plugin_loaded(App, Persistent) end);
  263. {error, Error0} ->
  264. {error, Error0}
  265. end
  266. catch _ : Error : Stacktrace ->
  267. ?LOG(alert, "Plugin ~s load failed with ~p", [Name, {Error, Stacktrace}]),
  268. {error, parse_config_file_failed}
  269. end.
  270. load_app(App) ->
  271. case application:load(App) of
  272. ok ->
  273. ok;
  274. {error, {already_loaded, App}} ->
  275. ok;
  276. {error, Error} ->
  277. {error, Error}
  278. end.
  279. start_app(App, SuccFun) ->
  280. case application:ensure_all_started(App) of
  281. {ok, Started} ->
  282. ?LOG(info, "Started plugins: ~p", [Started]),
  283. ?LOG(info, "Load plugin ~s successfully", [App]),
  284. _ = SuccFun(App),
  285. ok;
  286. {error, {ErrApp, Reason}} ->
  287. ?LOG(error, "Load plugin ~s failed, cannot start plugin ~s for ~0p", [App, ErrApp, Reason]),
  288. {error, {ErrApp, Reason}}
  289. end.
  290. unload_plugin(App, Persistent) ->
  291. case stop_app(App) of
  292. ok ->
  293. _ = plugin_unloaded(App, Persistent), ok;
  294. {error, Reason} ->
  295. {error, Reason}
  296. end.
  297. stop_app(App) ->
  298. case application:stop(App) of
  299. ok ->
  300. ?LOG(info, "Stop plugin ~s successfully", [App]), ok;
  301. {error, {not_started, App}} ->
  302. ?LOG(error, "Plugin ~s is not started", [App]), ok;
  303. {error, Reason} ->
  304. ?LOG(error, "Stop plugin ~s error: ~p", [App]), {error, Reason}
  305. end.
  306. names(plugin) ->
  307. names(list());
  308. names(started_app) ->
  309. [Name || {Name, _Descr, _Ver} <- application:which_applications()];
  310. names(Plugins) ->
  311. [Name || #plugin{name = Name} <- Plugins].
  312. plugin_loaded(_Name, false) ->
  313. ok;
  314. plugin_loaded(Name, true) ->
  315. case read_loaded() of
  316. {ok, Names} ->
  317. case lists:member(Name, Names) of
  318. false ->
  319. %% write file if plugin is loaded
  320. write_loaded(lists:append(Names, [{Name, true}]));
  321. true ->
  322. ignore
  323. end;
  324. {error, Error} ->
  325. ?LOG(error, "Cannot read loaded plugins: ~p", [Error])
  326. end.
  327. plugin_unloaded(_Name, false) ->
  328. ok;
  329. plugin_unloaded(Name, true) ->
  330. case read_loaded() of
  331. {ok, Names0} ->
  332. Names = filter_plugins(Names0),
  333. case lists:member(Name, Names) of
  334. true ->
  335. write_loaded(lists:delete(Name, Names));
  336. false ->
  337. ?LOG(error, "Cannot find ~s in loaded_file", [Name])
  338. end;
  339. {error, Error} ->
  340. ?LOG(error, "Cannot read loaded_plugins: ~p", [Error])
  341. end.
  342. read_loaded() ->
  343. case emqx:get_env(plugins_loaded_file) of
  344. undefined -> {error, not_found};
  345. File -> read_loaded(File)
  346. end.
  347. read_loaded(File) -> file:consult(File).
  348. write_loaded(AppNames) ->
  349. FilePath = emqx:get_env(plugins_loaded_file),
  350. case file:write_file(FilePath, [io_lib:format("~p.~n", [Name]) || Name <- AppNames]) of
  351. ok -> ok;
  352. {error, Error} ->
  353. ?LOG(error, "Write File ~p Error: ~p", [FilePath, Error]),
  354. {error, Error}
  355. end.
  356. plugin_type(auth) -> auth;
  357. plugin_type(protocol) -> protocol;
  358. plugin_type(backend) -> backend;
  359. plugin_type(bridge) -> bridge;
  360. plugin_type(_) -> feature.