emqx_plugins.erl 35 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2017-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_plugins).
  17. -include_lib("emqx/include/logger.hrl").
  18. -include("emqx_plugins.hrl").
  19. -ifdef(TEST).
  20. -include_lib("eunit/include/eunit.hrl").
  21. -endif.
  22. -export([
  23. describe/1,
  24. plugin_avsc/1,
  25. plugin_i18n/1,
  26. plugin_avro/1,
  27. parse_name_vsn/1,
  28. make_name_vsn_string/2
  29. ]).
  30. %% Package operations
  31. -export([
  32. ensure_installed/1,
  33. ensure_uninstalled/1,
  34. ensure_enabled/1,
  35. ensure_enabled/2,
  36. ensure_enabled/3,
  37. ensure_disabled/1,
  38. purge/1,
  39. delete_package/1
  40. ]).
  41. %% Plugin runtime management
  42. -export([
  43. ensure_started/0,
  44. ensure_started/1,
  45. ensure_stopped/0,
  46. ensure_stopped/1,
  47. restart/1,
  48. list/0
  49. ]).
  50. %% Plugin config APIs
  51. -export([
  52. get_config/1,
  53. get_config/2,
  54. get_config/3,
  55. get_config/4,
  56. put_config/3
  57. ]).
  58. %% Package utils
  59. -export([
  60. decode_plugin_avro_config/2,
  61. install_dir/0,
  62. avsc_file_path/1
  63. ]).
  64. %% `emqx_config_handler' API
  65. -export([
  66. post_config_update/5
  67. ]).
  68. %% RPC call
  69. -export([get_tar/1]).
  70. %% Internal export
  71. -export([do_ensure_started/1]).
  72. %% for test cases
  73. -export([put_config_internal/2]).
  74. -ifdef(TEST).
  75. -compile(export_all).
  76. -compile(nowarn_export_all).
  77. -endif.
  78. %% Defines
  79. -define(PLUGIN_PERSIS_CONFIG_KEY(NameVsn), {?MODULE, NameVsn}).
  80. -define(RAW_BIN, binary).
  81. -define(JSON_MAP, json_map).
  82. %% "my_plugin-0.1.0"
  83. -type name_vsn() :: binary() | string().
  84. %% the parse result of the JSON info file
  85. -type plugin() :: map().
  86. -type schema_json() :: map().
  87. -type i18n_json() :: map().
  88. -type avro_binary() :: binary().
  89. -type plugin_config() :: map().
  90. -type position() :: no_move | front | rear | {before, name_vsn()} | {behind, name_vsn()}.
  91. %%--------------------------------------------------------------------
  92. %% APIs
  93. %%--------------------------------------------------------------------
  94. %% @doc Describe a plugin.
  95. -spec describe(name_vsn()) -> {ok, plugin()} | {error, any()}.
  96. describe(NameVsn) ->
  97. read_plugin_info(NameVsn, #{fill_readme => true}).
  98. -spec plugin_avsc(name_vsn()) -> {ok, schema_json()} | {error, any()}.
  99. plugin_avsc(NameVsn) ->
  100. read_plugin_avsc(NameVsn).
  101. -spec plugin_i18n(name_vsn()) -> {ok, i18n_json()} | {error, any()}.
  102. plugin_i18n(NameVsn) ->
  103. read_plugin_i18n(NameVsn).
  104. -spec plugin_avro(name_vsn()) -> {ok, avro_binary()} | {error, any()}.
  105. plugin_avro(NameVsn) ->
  106. read_plugin_avro(NameVsn).
  107. parse_name_vsn(NameVsn) when is_binary(NameVsn) ->
  108. parse_name_vsn(binary_to_list(NameVsn));
  109. parse_name_vsn(NameVsn) when is_list(NameVsn) ->
  110. case lists:splitwith(fun(X) -> X =/= $- end, NameVsn) of
  111. {AppName, [$- | Vsn]} -> {ok, list_to_atom(AppName), Vsn};
  112. _ -> {error, "bad_name_vsn"}
  113. end.
  114. make_name_vsn_string(Name, Vsn) ->
  115. binary_to_list(iolist_to_binary([Name, "-", Vsn])).
  116. %%--------------------------------------------------------------------
  117. %% Package operations
  118. %% @doc Install a .tar.gz package placed in install_dir.
  119. -spec ensure_installed(name_vsn()) -> ok | {error, map()}.
  120. ensure_installed(NameVsn) ->
  121. case read_plugin_info(NameVsn, #{}) of
  122. {ok, _} ->
  123. ok;
  124. {error, _} ->
  125. ok = purge(NameVsn),
  126. do_ensure_installed(NameVsn)
  127. end.
  128. %% @doc Ensure files and directories for the given plugin are being deleted.
  129. %% If a plugin is running, or enabled, an error is returned.
  130. -spec ensure_uninstalled(name_vsn()) -> ok | {error, any()}.
  131. ensure_uninstalled(NameVsn) ->
  132. case read_plugin_info(NameVsn, #{}) of
  133. {ok, #{running_status := RunningSt}} when RunningSt =/= stopped ->
  134. {error, #{
  135. error_msg => "bad_plugin_running_status",
  136. hint => "stop_the_plugin_first"
  137. }};
  138. {ok, #{config_status := enabled}} ->
  139. {error, #{
  140. error_msg => "bad_plugin_config_status",
  141. hint => "disable_the_plugin_first"
  142. }};
  143. _ ->
  144. purge(NameVsn),
  145. ensure_delete(NameVsn)
  146. end.
  147. %% @doc Ensure a plugin is enabled to the end of the plugins list.
  148. -spec ensure_enabled(name_vsn()) -> ok | {error, any()}.
  149. ensure_enabled(NameVsn) ->
  150. ensure_enabled(NameVsn, no_move).
  151. %% @doc Ensure a plugin is enabled at the given position of the plugin list.
  152. -spec ensure_enabled(name_vsn(), position()) -> ok | {error, any()}.
  153. ensure_enabled(NameVsn, Position) ->
  154. ensure_state(NameVsn, Position, _Enabled = true, _ConfLocation = local).
  155. -spec ensure_enabled(name_vsn(), position(), local | global) -> ok | {error, any()}.
  156. ensure_enabled(NameVsn, Position, ConfLocation) when
  157. ConfLocation =:= local; ConfLocation =:= global
  158. ->
  159. ensure_state(NameVsn, Position, _Enabled = true, ConfLocation).
  160. %% @doc Ensure a plugin is disabled.
  161. -spec ensure_disabled(name_vsn()) -> ok | {error, any()}.
  162. ensure_disabled(NameVsn) ->
  163. ensure_state(NameVsn, no_move, false, _ConfLocation = local).
  164. %% @doc Delete extracted dir
  165. %% In case one lib is shared by multiple plugins.
  166. %% it might be the case that purging one plugin's install dir
  167. %% will cause deletion of loaded beams.
  168. %% It should not be a problem, because shared lib should
  169. %% reside in all the plugin install dirs.
  170. -spec purge(name_vsn()) -> ok.
  171. purge(NameVsn) ->
  172. _ = maybe_purge_plugin_config(NameVsn),
  173. purge_plugin(NameVsn).
  174. %% @doc Delete the package file.
  175. -spec delete_package(name_vsn()) -> ok.
  176. delete_package(NameVsn) ->
  177. File = pkg_file_path(NameVsn),
  178. _ = emqx_plugins_serde:delete_schema(NameVsn),
  179. case file:delete(File) of
  180. ok ->
  181. ?SLOG(info, #{msg => "purged_plugin_dir", path => File}),
  182. ok;
  183. {error, enoent} ->
  184. ok;
  185. {error, Reason} ->
  186. ?SLOG(error, #{
  187. msg => "failed_to_delete_package_file",
  188. path => File,
  189. reason => Reason
  190. }),
  191. {error, Reason}
  192. end.
  193. %%--------------------------------------------------------------------
  194. %% Plugin runtime management
  195. %% @doc Start all configured plugins are started.
  196. -spec ensure_started() -> ok.
  197. ensure_started() ->
  198. ok = for_plugins(fun ?MODULE:do_ensure_started/1).
  199. %% @doc Start a plugin from Management API or CLI.
  200. %% the input is a <name>-<vsn> string.
  201. -spec ensure_started(name_vsn()) -> ok | {error, term()}.
  202. ensure_started(NameVsn) ->
  203. case do_ensure_started(NameVsn) of
  204. ok ->
  205. ok;
  206. {error, Reason} ->
  207. ?SLOG(alert, Reason#{msg => "failed_to_start_plugin"}),
  208. {error, Reason}
  209. end.
  210. %% @doc Stop all plugins before broker stops.
  211. -spec ensure_stopped() -> ok.
  212. ensure_stopped() ->
  213. for_plugins(fun ?MODULE:ensure_stopped/1).
  214. %% @doc Stop a plugin from Management API or CLI.
  215. -spec ensure_stopped(name_vsn()) -> ok | {error, term()}.
  216. ensure_stopped(NameVsn) ->
  217. tryit(
  218. "stop_plugin",
  219. fun() ->
  220. Plugin = do_read_plugin(NameVsn),
  221. ensure_apps_stopped(Plugin)
  222. end
  223. ).
  224. get_config(Name, Vsn, Options, Default) ->
  225. get_config(make_name_vsn_string(Name, Vsn), Options, Default).
  226. -spec get_config(name_vsn()) ->
  227. {ok, plugin_config()}
  228. | {error, term()}.
  229. get_config(NameVsn) ->
  230. get_config(bin(NameVsn), #{format => ?CONFIG_FORMAT_MAP}).
  231. -spec get_config(name_vsn(), Options :: map()) ->
  232. {ok, avro_binary() | plugin_config()}
  233. | {error, term()}.
  234. get_config(NameVsn, #{format := ?CONFIG_FORMAT_AVRO}) ->
  235. %% no default value when get raw binary config
  236. case read_plugin_avro(NameVsn) of
  237. {ok, _AvroJson} = Res -> Res;
  238. {error, _Reason} = Err -> Err
  239. end;
  240. get_config(NameVsn, Options = #{format := ?CONFIG_FORMAT_MAP}) ->
  241. get_config(NameVsn, Options, #{}).
  242. get_config(NameVsn, #{format := ?CONFIG_FORMAT_MAP}, Default) ->
  243. {ok, persistent_term:get(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn), Default)}.
  244. %% @doc Update plugin's config.
  245. %% RPC call from Management API or CLI.
  246. %% the avro Json Map and plugin config ALWAYS be valid before calling this function.
  247. put_config(NameVsn, AvroJsonMap, _DecodedPluginConfig) ->
  248. AvroJsonBin = emqx_utils_json:encode(AvroJsonMap),
  249. ok = write_avro_bin(NameVsn, AvroJsonBin),
  250. ok = persistent_term:put(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn), AvroJsonMap),
  251. ok.
  252. %% @doc Stop and then start the plugin.
  253. restart(NameVsn) ->
  254. case ensure_stopped(NameVsn) of
  255. ok -> ensure_started(NameVsn);
  256. {error, Reason} -> {error, Reason}
  257. end.
  258. %% @doc List all installed plugins.
  259. %% Including the ones that are installed, but not enabled in config.
  260. -spec list() -> [plugin()].
  261. list() ->
  262. Pattern = filename:join([install_dir(), "*", "release.json"]),
  263. All = lists:filtermap(
  264. fun(JsonFilePath) ->
  265. [_, NameVsn | _] = lists:reverse(filename:split(JsonFilePath)),
  266. case read_plugin_info(NameVsn, #{}) of
  267. {ok, Info} ->
  268. {true, Info};
  269. {error, Reason} ->
  270. ?SLOG(warning, Reason),
  271. false
  272. end
  273. end,
  274. filelib:wildcard(Pattern)
  275. ),
  276. do_list(configured(), All).
  277. %%--------------------------------------------------------------------
  278. %% Package utils
  279. -spec decode_plugin_avro_config(name_vsn(), map() | binary()) -> {ok, map()} | {error, any()}.
  280. decode_plugin_avro_config(NameVsn, AvroJsonMap) when is_map(AvroJsonMap) ->
  281. decode_plugin_avro_config(NameVsn, emqx_utils_json:encode(AvroJsonMap));
  282. decode_plugin_avro_config(NameVsn, AvroJsonBin) ->
  283. case emqx_plugins_serde:decode(NameVsn, AvroJsonBin) of
  284. {ok, Config} -> {ok, Config};
  285. {error, ReasonMap} -> {error, ReasonMap}
  286. end.
  287. get_config_interal(Key, Default) when is_atom(Key) ->
  288. get_config_interal([Key], Default);
  289. get_config_interal(Path, Default) ->
  290. emqx_conf:get([?CONF_ROOT | Path], Default).
  291. put_config_internal(Key, Value) ->
  292. do_put_config_internal(Key, Value, _ConfLocation = local).
  293. -spec get_tar(name_vsn()) -> {ok, binary()} | {error, any}.
  294. get_tar(NameVsn) ->
  295. TarGz = pkg_file_path(NameVsn),
  296. case file:read_file(TarGz) of
  297. {ok, Content} ->
  298. {ok, Content};
  299. {error, _} ->
  300. case maybe_create_tar(NameVsn, TarGz, install_dir()) of
  301. ok ->
  302. file:read_file(TarGz);
  303. Err ->
  304. Err
  305. end
  306. end.
  307. %%--------------------------------------------------------------------
  308. %% Internal
  309. %%--------------------------------------------------------------------
  310. maybe_create_tar(NameVsn, TarGzName, InstallDir) when is_binary(InstallDir) ->
  311. maybe_create_tar(NameVsn, TarGzName, binary_to_list(InstallDir));
  312. maybe_create_tar(NameVsn, TarGzName, InstallDir) ->
  313. case filelib:wildcard(filename:join(plugin_dir(NameVsn), "**")) of
  314. [_ | _] = PluginFiles ->
  315. InstallDir1 = string:trim(InstallDir, trailing, "/") ++ "/",
  316. PluginFiles1 = [{string:prefix(F, InstallDir1), F} || F <- PluginFiles],
  317. erl_tar:create(TarGzName, PluginFiles1, [compressed]);
  318. _ ->
  319. {error, plugin_not_found}
  320. end.
  321. write_tar_file_content(BaseDir, TarContent) ->
  322. lists:foreach(
  323. fun({Name, Bin}) ->
  324. Filename = filename:join(BaseDir, Name),
  325. ok = filelib:ensure_dir(Filename),
  326. ok = file:write_file(Filename, Bin)
  327. end,
  328. TarContent
  329. ).
  330. delete_tar_file_content(BaseDir, TarContent) ->
  331. lists:foreach(
  332. fun({Name, _}) ->
  333. Filename = filename:join(BaseDir, Name),
  334. case filelib:is_file(Filename) of
  335. true ->
  336. TopDirOrFile = top_dir(BaseDir, Filename),
  337. ok = file:del_dir_r(TopDirOrFile);
  338. false ->
  339. %% probably already deleted
  340. ok
  341. end
  342. end,
  343. TarContent
  344. ).
  345. top_dir(BaseDir0, DirOrFile) ->
  346. BaseDir = normalize_dir(BaseDir0),
  347. case filename:dirname(DirOrFile) of
  348. RockBottom when RockBottom =:= "/" orelse RockBottom =:= "." ->
  349. throw({out_of_bounds, DirOrFile});
  350. BaseDir ->
  351. DirOrFile;
  352. Parent ->
  353. top_dir(BaseDir, Parent)
  354. end.
  355. normalize_dir(Dir) ->
  356. %% Get rid of possible trailing slash
  357. filename:join([Dir, ""]).
  358. -ifdef(TEST).
  359. normalize_dir_test_() ->
  360. [
  361. ?_assertEqual("foo", normalize_dir("foo")),
  362. ?_assertEqual("foo", normalize_dir("foo/")),
  363. ?_assertEqual("/foo", normalize_dir("/foo")),
  364. ?_assertEqual("/foo", normalize_dir("/foo/"))
  365. ].
  366. top_dir_test_() ->
  367. [
  368. ?_assertEqual("base/foo", top_dir("base", filename:join(["base", "foo", "bar"]))),
  369. ?_assertEqual("/base/foo", top_dir("/base", filename:join(["/", "base", "foo", "bar"]))),
  370. ?_assertEqual("/base/foo", top_dir("/base/", filename:join(["/", "base", "foo", "bar"]))),
  371. ?_assertThrow({out_of_bounds, _}, top_dir("/base", filename:join(["/", "base"]))),
  372. ?_assertThrow({out_of_bounds, _}, top_dir("/base", filename:join(["/", "foo", "bar"])))
  373. ].
  374. -endif.
  375. do_ensure_installed(NameVsn) ->
  376. TarGz = pkg_file_path(NameVsn),
  377. case erl_tar:extract(TarGz, [compressed, memory]) of
  378. {ok, TarContent} ->
  379. ok = write_tar_file_content(install_dir(), TarContent),
  380. case read_plugin_info(NameVsn, #{}) of
  381. {ok, _} ->
  382. ok = maybe_post_op_after_install(NameVsn),
  383. ok;
  384. {error, Reason} ->
  385. ?SLOG(warning, Reason#{msg => "failed_to_read_after_install"}),
  386. ok = delete_tar_file_content(install_dir(), TarContent),
  387. {error, Reason}
  388. end;
  389. {error, {_, enoent}} ->
  390. {error, #{
  391. error_msg => "failed_to_extract_plugin_package",
  392. path => TarGz,
  393. reason => not_found
  394. }};
  395. {error, Reason} ->
  396. {error, #{
  397. error_msg => "bad_plugin_package",
  398. path => TarGz,
  399. reason => Reason
  400. }}
  401. end.
  402. ensure_delete(NameVsn0) ->
  403. NameVsn = bin(NameVsn0),
  404. List = configured(),
  405. put_configured(lists:filter(fun(#{name_vsn := N1}) -> bin(N1) =/= NameVsn end, List)),
  406. ok.
  407. ensure_state(NameVsn, Position, State, ConfLocation) when is_binary(NameVsn) ->
  408. ensure_state(binary_to_list(NameVsn), Position, State, ConfLocation);
  409. ensure_state(NameVsn, Position, State, ConfLocation) ->
  410. case read_plugin_info(NameVsn, #{}) of
  411. {ok, _} ->
  412. Item = #{
  413. name_vsn => NameVsn,
  414. enable => State
  415. },
  416. tryit(
  417. "ensure_state",
  418. fun() -> ensure_configured(Item, Position, ConfLocation) end
  419. );
  420. {error, Reason} ->
  421. {error, Reason}
  422. end.
  423. ensure_configured(#{name_vsn := NameVsn} = Item, Position, ConfLocation) ->
  424. Configured = configured(),
  425. SplitFun = fun(#{name_vsn := Nv}) -> bin(Nv) =/= bin(NameVsn) end,
  426. {Front, Rear} = lists:splitwith(SplitFun, Configured),
  427. NewConfigured =
  428. case Rear of
  429. [_ | More] when Position =:= no_move ->
  430. Front ++ [Item | More];
  431. [_ | More] ->
  432. add_new_configured(Front ++ More, Position, Item);
  433. [] ->
  434. add_new_configured(Configured, Position, Item)
  435. end,
  436. ok = put_configured(NewConfigured, ConfLocation).
  437. add_new_configured(Configured, no_move, Item) ->
  438. %% default to rear
  439. add_new_configured(Configured, rear, Item);
  440. add_new_configured(Configured, front, Item) ->
  441. [Item | Configured];
  442. add_new_configured(Configured, rear, Item) ->
  443. Configured ++ [Item];
  444. add_new_configured(Configured, {Action, NameVsn}, Item) ->
  445. SplitFun = fun(#{name_vsn := Nv}) -> bin(Nv) =/= bin(NameVsn) end,
  446. {Front, Rear} = lists:splitwith(SplitFun, Configured),
  447. Rear =:= [] andalso
  448. throw(#{
  449. error_msg => "position_anchor_plugin_not_configured",
  450. hint => "maybe_install_and_configure",
  451. name_vsn => NameVsn
  452. }),
  453. case Action of
  454. before ->
  455. Front ++ [Item | Rear];
  456. behind ->
  457. [Anchor | Rear0] = Rear,
  458. Front ++ [Anchor, Item | Rear0]
  459. end.
  460. maybe_purge_plugin_config(NameVsn) ->
  461. _ = persistent_term:erase(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn)),
  462. ok.
  463. purge_plugin(NameVsn) ->
  464. Dir = plugin_dir(NameVsn),
  465. purge_plugin_dir(Dir).
  466. purge_plugin_dir(Dir) ->
  467. case file:del_dir_r(Dir) of
  468. ok ->
  469. ?SLOG(info, #{
  470. msg => "purged_plugin_dir",
  471. dir => Dir
  472. });
  473. {error, enoent} ->
  474. ok;
  475. {error, Reason} ->
  476. ?SLOG(error, #{
  477. msg => "failed_to_purge_plugin_dir",
  478. dir => Dir,
  479. reason => Reason
  480. }),
  481. {error, Reason}
  482. end.
  483. %% Make sure configured ones are ordered in front.
  484. do_list([], All) ->
  485. All;
  486. do_list([#{name_vsn := NameVsn} | Rest], All) ->
  487. SplitF = fun(#{<<"name">> := Name, <<"rel_vsn">> := Vsn}) ->
  488. bin([Name, "-", Vsn]) =/= bin(NameVsn)
  489. end,
  490. case lists:splitwith(SplitF, All) of
  491. {_, []} ->
  492. ?SLOG(warning, #{
  493. msg => "configured_plugin_not_installed",
  494. name_vsn => NameVsn
  495. }),
  496. do_list(Rest, All);
  497. {Front, [I | Rear]} ->
  498. [I | do_list(Rest, Front ++ Rear)]
  499. end.
  500. do_ensure_started(NameVsn) ->
  501. tryit(
  502. "start_plugins",
  503. fun() ->
  504. case ensure_exists_and_installed(NameVsn) of
  505. ok ->
  506. Plugin = do_read_plugin(NameVsn),
  507. ok = load_code_start_apps(NameVsn, Plugin);
  508. {error, plugin_not_found} ->
  509. ?SLOG(error, #{
  510. error_msg => "plugin_not_found",
  511. name_vsn => NameVsn
  512. }),
  513. ok
  514. end
  515. end
  516. ).
  517. %%--------------------------------------------------------------------
  518. %% try the function, catch 'throw' exceptions as normal 'error' return
  519. %% other exceptions with stacktrace logged.
  520. tryit(WhichOp, F) ->
  521. try
  522. F()
  523. catch
  524. throw:ReasonMap ->
  525. %% thrown exceptions are known errors
  526. %% translate to a return value without stacktrace
  527. {error, ReasonMap};
  528. error:Reason:Stacktrace ->
  529. %% unexpected errors, log stacktrace
  530. ?SLOG(warning, #{
  531. msg => "plugin_op_failed",
  532. which_op => WhichOp,
  533. exception => Reason,
  534. stacktrace => Stacktrace
  535. }),
  536. {error, {failed, WhichOp}}
  537. end.
  538. %% read plugin info from the JSON file
  539. %% returns {ok, Info} or {error, Reason}
  540. read_plugin_info(NameVsn, Options) ->
  541. tryit(
  542. atom_to_list(?FUNCTION_NAME),
  543. fun() -> {ok, do_read_plugin2(NameVsn, Options)} end
  544. ).
  545. do_read_plugin(NameVsn) ->
  546. do_read_plugin2(NameVsn, #{}).
  547. do_read_plugin2(NameVsn, Option) ->
  548. do_read_plugin3(NameVsn, info_file_path(NameVsn), Option).
  549. do_read_plugin3(NameVsn, InfoFilePath, Options) ->
  550. {ok, PlainMap} = (read_file_fun(InfoFilePath, "bad_info_file", #{read_mode => ?JSON_MAP}))(),
  551. Info0 = check_plugin(PlainMap, NameVsn, InfoFilePath),
  552. Info1 = plugins_readme(NameVsn, Options, Info0),
  553. plugin_status(NameVsn, Info1).
  554. read_plugin_avsc(NameVsn) ->
  555. read_plugin_avsc(NameVsn, #{read_mode => ?JSON_MAP}).
  556. read_plugin_avsc(NameVsn, Options) ->
  557. tryit(
  558. atom_to_list(?FUNCTION_NAME),
  559. read_file_fun(avsc_file_path(NameVsn), "bad_avsc_file", Options)
  560. ).
  561. read_plugin_i18n(NameVsn) ->
  562. read_plugin_i18n(NameVsn, #{read_mode => ?JSON_MAP}).
  563. read_plugin_i18n(NameVsn, Options) ->
  564. tryit(
  565. atom_to_list(?FUNCTION_NAME),
  566. read_file_fun(i18n_file_path(NameVsn), "bad_i18n_file", Options)
  567. ).
  568. read_plugin_avro(NameVsn) ->
  569. read_plugin_avro(NameVsn, #{read_mode => ?RAW_BIN}).
  570. read_plugin_avro(NameVsn, Options) ->
  571. tryit(
  572. atom_to_list(?FUNCTION_NAME),
  573. read_file_fun(avro_config_file(NameVsn), "bad_avro_file", Options)
  574. ).
  575. ensure_exists_and_installed(NameVsn) ->
  576. case filelib:is_dir(plugin_dir(NameVsn)) of
  577. true ->
  578. ok;
  579. false ->
  580. %% Do we have the package, but it's not extracted yet?
  581. case get_tar(NameVsn) of
  582. {ok, TarContent} ->
  583. ok = file:write_file(pkg_file_path(NameVsn), TarContent),
  584. ok = do_ensure_installed(NameVsn);
  585. _ ->
  586. %% If not, try to get it from the cluster.
  587. do_get_from_cluster(NameVsn)
  588. end
  589. end.
  590. do_get_from_cluster(NameVsn) ->
  591. Nodes = [N || N <- mria:running_nodes(), N /= node()],
  592. case get_from_any_node(Nodes, NameVsn, []) of
  593. {ok, TarContent} ->
  594. ok = file:write_file(pkg_file_path(NameVsn), TarContent),
  595. ok = do_ensure_installed(NameVsn);
  596. {error, NodeErrors} when Nodes =/= [] ->
  597. ?SLOG(error, #{
  598. msg => "failed_to_copy_plugin_from_other_nodes",
  599. name_vsn => NameVsn,
  600. node_errors => NodeErrors
  601. }),
  602. {error, plugin_not_found};
  603. {error, _} ->
  604. ?SLOG(error, #{
  605. msg => "no_nodes_to_copy_plugin_from",
  606. name_vsn => NameVsn
  607. }),
  608. {error, plugin_not_found}
  609. end.
  610. get_from_any_node([], _NameVsn, Errors) ->
  611. {error, Errors};
  612. get_from_any_node([Node | T], NameVsn, Errors) ->
  613. case emqx_plugins_proto_v1:get_tar(Node, NameVsn, infinity) of
  614. {ok, _} = Res ->
  615. Res;
  616. Err ->
  617. get_from_any_node(T, NameVsn, [{Node, Err} | Errors])
  618. end.
  619. plugins_readme(NameVsn, #{fill_readme := true}, Info) ->
  620. case file:read_file(readme_file(NameVsn)) of
  621. {ok, Bin} -> Info#{readme => Bin};
  622. _ -> Info#{readme => <<>>}
  623. end;
  624. plugins_readme(_NameVsn, _Options, Info) ->
  625. Info.
  626. plugin_status(NameVsn, Info) ->
  627. {ok, AppName, _AppVsn} = parse_name_vsn(NameVsn),
  628. RunningSt =
  629. case application:get_key(AppName, vsn) of
  630. {ok, _} ->
  631. case lists:keyfind(AppName, 1, running_apps()) of
  632. {AppName, _} -> running;
  633. _ -> loaded
  634. end;
  635. undefined ->
  636. stopped
  637. end,
  638. Configured = lists:filtermap(
  639. fun(#{name_vsn := Nv, enable := St}) ->
  640. case bin(Nv) =:= bin(NameVsn) of
  641. true -> {true, St};
  642. false -> false
  643. end
  644. end,
  645. configured()
  646. ),
  647. ConfSt =
  648. case Configured of
  649. [] -> not_configured;
  650. [true] -> enabled;
  651. [false] -> disabled
  652. end,
  653. Info#{
  654. running_status => RunningSt,
  655. config_status => ConfSt
  656. }.
  657. check_plugin(
  658. #{
  659. <<"name">> := Name,
  660. <<"rel_vsn">> := Vsn,
  661. <<"rel_apps">> := Apps,
  662. <<"description">> := _
  663. } = Info,
  664. NameVsn,
  665. FilePath
  666. ) ->
  667. case bin(NameVsn) =:= bin([Name, "-", Vsn]) of
  668. true ->
  669. try
  670. %% assert
  671. [_ | _] = Apps,
  672. %% validate if the list is all <app>-<vsn> strings
  673. lists:foreach(fun(App) -> {ok, _, _} = parse_name_vsn(App) end, Apps)
  674. catch
  675. _:_ ->
  676. throw(#{
  677. error_msg => "bad_rel_apps",
  678. rel_apps => Apps,
  679. hint => "A non-empty string list of app_name-app_vsn format"
  680. })
  681. end,
  682. Info;
  683. false ->
  684. throw(#{
  685. error_msg => "name_vsn_mismatch",
  686. name_vsn => NameVsn,
  687. path => FilePath,
  688. name => Name,
  689. rel_vsn => Vsn
  690. })
  691. end;
  692. check_plugin(_What, NameVsn, File) ->
  693. throw(#{
  694. error_msg => "bad_info_file_content",
  695. mandatory_fields => [rel_vsn, name, rel_apps, description],
  696. name_vsn => NameVsn,
  697. path => File
  698. }).
  699. load_code_start_apps(RelNameVsn, #{<<"rel_apps">> := Apps}) ->
  700. LibDir = filename:join([install_dir(), RelNameVsn]),
  701. RunningApps = running_apps(),
  702. %% load plugin apps and beam code
  703. AppNames =
  704. lists:map(
  705. fun(AppNameVsn) ->
  706. {ok, AppName, AppVsn} = parse_name_vsn(AppNameVsn),
  707. EbinDir = filename:join([LibDir, AppNameVsn, "ebin"]),
  708. ok = load_plugin_app(AppName, AppVsn, EbinDir, RunningApps),
  709. AppName
  710. end,
  711. Apps
  712. ),
  713. lists:foreach(fun start_app/1, AppNames).
  714. load_plugin_app(AppName, AppVsn, Ebin, RunningApps) ->
  715. case lists:keyfind(AppName, 1, RunningApps) of
  716. false ->
  717. do_load_plugin_app(AppName, Ebin);
  718. {_, Vsn} ->
  719. case bin(Vsn) =:= bin(AppVsn) of
  720. true ->
  721. %% already started on the exact version
  722. ok;
  723. false ->
  724. %% running but a different version
  725. ?SLOG(warning, #{
  726. msg => "plugin_app_already_running",
  727. name => AppName,
  728. running_vsn => Vsn,
  729. loading_vsn => AppVsn
  730. })
  731. end
  732. end.
  733. do_load_plugin_app(AppName, Ebin) when is_binary(Ebin) ->
  734. do_load_plugin_app(AppName, binary_to_list(Ebin));
  735. do_load_plugin_app(AppName, Ebin) ->
  736. _ = code:add_patha(Ebin),
  737. Modules = filelib:wildcard(filename:join([Ebin, "*.beam"])),
  738. lists:foreach(
  739. fun(BeamFile) ->
  740. Module = list_to_atom(filename:basename(BeamFile, ".beam")),
  741. _ = code:purge(Module),
  742. case code:load_file(Module) of
  743. {module, _} ->
  744. ok;
  745. {error, Reason} ->
  746. throw(#{
  747. error_msg => "failed_to_load_plugin_beam",
  748. path => BeamFile,
  749. reason => Reason
  750. })
  751. end
  752. end,
  753. Modules
  754. ),
  755. case application:load(AppName) of
  756. ok ->
  757. ok;
  758. {error, {already_loaded, _}} ->
  759. ok;
  760. {error, Reason} ->
  761. throw(#{
  762. error_msg => "failed_to_load_plugin_app",
  763. name => AppName,
  764. reason => Reason
  765. })
  766. end.
  767. start_app(App) ->
  768. case application:ensure_all_started(App) of
  769. {ok, Started} ->
  770. case Started =/= [] of
  771. true -> ?SLOG(debug, #{msg => "started_plugin_apps", apps => Started});
  772. false -> ok
  773. end,
  774. ?SLOG(debug, #{msg => "started_plugin_app", app => App}),
  775. ok;
  776. {error, {ErrApp, Reason}} ->
  777. throw(#{
  778. error_msg => "failed_to_start_plugin_app",
  779. app => App,
  780. err_app => ErrApp,
  781. reason => Reason
  782. })
  783. end.
  784. %% Stop all apps installed by the plugin package,
  785. %% but not the ones shared with others.
  786. ensure_apps_stopped(#{<<"rel_apps">> := Apps}) ->
  787. %% load plugin apps and beam code
  788. AppsToStop =
  789. lists:map(
  790. fun(NameVsn) ->
  791. {ok, AppName, _AppVsn} = parse_name_vsn(NameVsn),
  792. AppName
  793. end,
  794. Apps
  795. ),
  796. case tryit("stop_apps", fun() -> stop_apps(AppsToStop) end) of
  797. {ok, []} ->
  798. %% all apps stopped
  799. ok;
  800. {ok, Left} ->
  801. ?SLOG(warning, #{
  802. msg => "unabled_to_stop_plugin_apps",
  803. apps => Left,
  804. reason => "running_apps_still_depends_on_this_apps"
  805. }),
  806. ok;
  807. {error, Reason} ->
  808. {error, Reason}
  809. end.
  810. stop_apps(Apps) ->
  811. RunningApps = running_apps(),
  812. case do_stop_apps(Apps, [], RunningApps) of
  813. %% all stopped
  814. {ok, []} -> {ok, []};
  815. %% no progress
  816. {ok, Remain} when Remain =:= Apps -> {ok, Apps};
  817. %% try again
  818. {ok, Remain} -> stop_apps(Remain)
  819. end.
  820. do_stop_apps([], Remain, _AllApps) ->
  821. {ok, lists:reverse(Remain)};
  822. do_stop_apps([App | Apps], Remain, RunningApps) ->
  823. case is_needed_by_any(App, RunningApps) of
  824. true ->
  825. do_stop_apps(Apps, [App | Remain], RunningApps);
  826. false ->
  827. ok = stop_app(App),
  828. do_stop_apps(Apps, Remain, RunningApps)
  829. end.
  830. stop_app(App) ->
  831. case application:stop(App) of
  832. ok ->
  833. ?SLOG(debug, #{msg => "stop_plugin_successfully", app => App}),
  834. ok = unload_moudle_and_app(App);
  835. {error, {not_started, App}} ->
  836. ?SLOG(debug, #{msg => "plugin_not_started", app => App}),
  837. ok = unload_moudle_and_app(App);
  838. {error, Reason} ->
  839. throw(#{error_msg => "failed_to_stop_app", app => App, reason => Reason})
  840. end.
  841. unload_moudle_and_app(App) ->
  842. case application:get_key(App, modules) of
  843. {ok, Modules} -> lists:foreach(fun code:soft_purge/1, Modules);
  844. _ -> ok
  845. end,
  846. _ = application:unload(App),
  847. ok.
  848. is_needed_by_any(AppToStop, RunningApps) ->
  849. lists:any(
  850. fun({RunningApp, _RunningAppVsn}) ->
  851. is_needed_by(AppToStop, RunningApp)
  852. end,
  853. RunningApps
  854. ).
  855. is_needed_by(AppToStop, AppToStop) ->
  856. false;
  857. is_needed_by(AppToStop, RunningApp) ->
  858. case application:get_key(RunningApp, applications) of
  859. {ok, Deps} -> lists:member(AppToStop, Deps);
  860. undefined -> false
  861. end.
  862. do_put_config_internal(Key, Value, ConfLocation) when is_atom(Key) ->
  863. do_put_config_internal([Key], Value, ConfLocation);
  864. do_put_config_internal(Path, Values, _ConfLocation = local) when is_list(Path) ->
  865. Opts = #{rawconf_with_defaults => true, override_to => cluster},
  866. %% Already in cluster_rpc, don't use emqx_conf:update, dead calls
  867. case emqx:update_config([?CONF_ROOT | Path], bin_key(Values), Opts) of
  868. {ok, _} -> ok;
  869. Error -> Error
  870. end;
  871. do_put_config_internal(Path, Values, _ConfLocation = global) when is_list(Path) ->
  872. Opts = #{rawconf_with_defaults => true, override_to => cluster},
  873. case emqx_conf:update([?CONF_ROOT | Path], bin_key(Values), Opts) of
  874. {ok, _} -> ok;
  875. Error -> Error
  876. end.
  877. %%--------------------------------------------------------------------
  878. %% `emqx_config_handler' API
  879. %%--------------------------------------------------------------------
  880. post_config_update([?CONF_ROOT], _Req, #{states := NewStates}, #{states := OldStates}, _Envs) ->
  881. NewStatesIndex = maps:from_list([{NV, S} || S = #{name_vsn := NV} <- NewStates]),
  882. OldStatesIndex = maps:from_list([{NV, S} || S = #{name_vsn := NV} <- OldStates]),
  883. #{changed := Changed} = emqx_utils_maps:diff_maps(NewStatesIndex, OldStatesIndex),
  884. maps:foreach(fun enable_disable_plugin/2, Changed),
  885. ok;
  886. post_config_update(_Path, _Req, _NewConf, _OldConf, _Envs) ->
  887. ok.
  888. enable_disable_plugin(NameVsn, {#{enable := true}, #{enable := false}}) ->
  889. %% errors are already logged in this fn
  890. _ = ensure_stopped(NameVsn),
  891. ok;
  892. enable_disable_plugin(NameVsn, {#{enable := false}, #{enable := true}}) ->
  893. %% errors are already logged in this fn
  894. _ = ensure_started(NameVsn),
  895. ok;
  896. enable_disable_plugin(_NameVsn, _Diff) ->
  897. ok.
  898. %%--------------------------------------------------------------------
  899. %% Helper functions
  900. %%--------------------------------------------------------------------
  901. install_dir() ->
  902. get_config_interal(install_dir, "").
  903. put_configured(Configured) ->
  904. put_configured(Configured, _ConfLocation = local).
  905. put_configured(Configured, ConfLocation) ->
  906. ok = do_put_config_internal(states, bin_key(Configured), ConfLocation).
  907. configured() ->
  908. get_config_interal(states, []).
  909. for_plugins(ActionFun) ->
  910. case lists:flatmap(fun(I) -> for_plugin(I, ActionFun) end, configured()) of
  911. [] -> ok;
  912. Errors -> erlang:error(#{function => ActionFun, errors => Errors})
  913. end.
  914. for_plugin(#{name_vsn := NameVsn, enable := true}, Fun) ->
  915. case Fun(NameVsn) of
  916. ok -> [];
  917. {error, Reason} -> [{NameVsn, Reason}]
  918. end;
  919. for_plugin(#{name_vsn := NameVsn, enable := false}, _Fun) ->
  920. ?SLOG(debug, #{
  921. msg => "plugin_disabled",
  922. name_vsn => NameVsn
  923. }),
  924. [].
  925. maybe_post_op_after_install(NameVsn) ->
  926. _ = maybe_load_config_schema(NameVsn),
  927. _ = maybe_create_config_dir(NameVsn),
  928. ok.
  929. maybe_load_config_schema(NameVsn) ->
  930. AvscPath = avsc_file_path(NameVsn),
  931. filelib:is_regular(AvscPath) andalso
  932. do_load_config_schema(NameVsn, AvscPath).
  933. do_load_config_schema(NameVsn, AvscPath) ->
  934. case emqx_plugins_serde:add_schema(NameVsn, AvscPath) of
  935. ok -> ok;
  936. {error, already_exists} -> ok;
  937. {error, _Reason} -> ok
  938. end.
  939. maybe_create_config_dir(NameVsn) ->
  940. ConfigDir = plugin_config_dir(NameVsn),
  941. case filelib:ensure_path(ConfigDir) of
  942. ok ->
  943. ok;
  944. {error, Reason} ->
  945. ?SLOG(warning, #{
  946. msg => "failed_to_create_plugin_config_dir",
  947. dir => ConfigDir,
  948. reason => Reason
  949. }),
  950. {error, {mkdir_failed, ConfigDir, Reason}}
  951. end.
  952. write_avro_bin(NameVsn, AvroBin) ->
  953. ok = file:write_file(avro_config_file(NameVsn), AvroBin).
  954. read_file_fun(Path, ErrMsg, #{read_mode := ?RAW_BIN}) ->
  955. fun() ->
  956. case file:read_file(Path) of
  957. {ok, Bin} ->
  958. {ok, Bin};
  959. {error, Reason} ->
  960. ErrMeta = #{error_msg => ErrMsg, reason => Reason},
  961. throw(ErrMeta)
  962. end
  963. end;
  964. read_file_fun(Path, ErrMsg, #{read_mode := ?JSON_MAP}) ->
  965. fun() ->
  966. case hocon:load(Path, #{format => richmap}) of
  967. {ok, RichMap} ->
  968. {ok, hocon_maps:ensure_plain(RichMap)};
  969. {error, Reason} ->
  970. ErrMeta = #{error_msg => ErrMsg, reason => Reason},
  971. throw(ErrMeta)
  972. end
  973. end.
  974. %% Directorys
  975. plugin_dir(NameVsn) ->
  976. filename:join([install_dir(), NameVsn]).
  977. plugin_config_dir(NameVsn) ->
  978. filename:join([plugin_dir(NameVsn), "data", "configs"]).
  979. %% Files
  980. pkg_file_path(NameVsn) ->
  981. filename:join([install_dir(), bin([NameVsn, ".tar.gz"])]).
  982. info_file_path(NameVsn) ->
  983. filename:join([plugin_dir(NameVsn), "release.json"]).
  984. avsc_file_path(NameVsn) ->
  985. filename:join([plugin_dir(NameVsn), "config_schema.avsc"]).
  986. avro_config_file(NameVsn) ->
  987. filename:join([plugin_config_dir(NameVsn), "config.avro"]).
  988. i18n_file_path(NameVsn) ->
  989. filename:join([plugin_dir(NameVsn), "config_i18n.json"]).
  990. readme_file(NameVsn) ->
  991. filename:join([plugin_dir(NameVsn), "README.md"]).
  992. running_apps() ->
  993. lists:map(
  994. fun({N, _, V}) ->
  995. {N, V}
  996. end,
  997. application:which_applications(infinity)
  998. ).
  999. bin_key(Map) when is_map(Map) ->
  1000. maps:fold(fun(K, V, Acc) -> Acc#{bin(K) => V} end, #{}, Map);
  1001. bin_key(List = [#{} | _]) ->
  1002. lists:map(fun(M) -> bin_key(M) end, List);
  1003. bin_key(Term) ->
  1004. Term.
  1005. bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
  1006. bin(L) when is_list(L) -> unicode:characters_to_binary(L, utf8);
  1007. bin(B) when is_binary(B) -> B.