emqx_plugins.erl 53 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2017-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_plugins).
  17. -feature(maybe_expr, enable).
  18. -include("emqx_plugins.hrl").
  19. -include_lib("emqx/include/logger.hrl").
  20. -include_lib("snabbkaffe/include/trace.hrl").
  21. -ifdef(TEST).
  22. -include_lib("eunit/include/eunit.hrl").
  23. -endif.
  24. -export([
  25. describe/1,
  26. plugin_schema_json/1,
  27. plugin_i18n_json/1,
  28. raw_plugin_config_content/1,
  29. parse_name_vsn/1,
  30. make_name_vsn_string/2
  31. ]).
  32. %% Package operations
  33. -export([
  34. ensure_installed/0,
  35. ensure_installed/1,
  36. ensure_installed/2,
  37. ensure_uninstalled/1,
  38. ensure_enabled/1,
  39. ensure_enabled/2,
  40. ensure_enabled/3,
  41. ensure_disabled/1,
  42. purge/1,
  43. delete_package/1
  44. ]).
  45. %% Plugin runtime management
  46. -export([
  47. ensure_started/0,
  48. ensure_started/1,
  49. ensure_stopped/0,
  50. ensure_stopped/1,
  51. restart/1,
  52. list/0,
  53. list/1
  54. ]).
  55. %% Plugin config APIs
  56. -export([
  57. get_config/1,
  58. get_config/2,
  59. get_config/3,
  60. get_config/4,
  61. put_config/3
  62. ]).
  63. %% Package utils
  64. -export([
  65. decode_plugin_config_map/2,
  66. install_dir/0,
  67. avsc_file_path/1,
  68. md5sum_file/1,
  69. with_plugin_avsc/1,
  70. ensure_ssl_files/2,
  71. ensure_ssl_files/3
  72. ]).
  73. %% `emqx_config_handler' API
  74. -export([
  75. post_config_update/5
  76. ]).
  77. %% RPC call
  78. -export([get_tar/1]).
  79. %% Internal export
  80. -export([
  81. ensure_config_map/1,
  82. do_ensure_started/1
  83. ]).
  84. %% for test cases
  85. -export([put_config_internal/2]).
  86. -ifdef(TEST).
  87. -compile(export_all).
  88. -compile(nowarn_export_all).
  89. -endif.
  90. %% Defines
  91. -define(PLUGIN_PERSIS_CONFIG_KEY(NameVsn), {?MODULE, NameVsn}).
  92. -define(RAW_BIN, binary).
  93. -define(JSON_MAP, json_map).
  94. -define(MAX_KEEP_BACKUP_CONFIGS, 10).
  95. %%--------------------------------------------------------------------
  96. %% APIs
  97. %%--------------------------------------------------------------------
  98. %% @doc Describe a plugin.
  99. -spec describe(name_vsn()) -> {ok, plugin_info()} | {error, any()}.
  100. describe(NameVsn) ->
  101. read_plugin_info(NameVsn, #{fill_readme => true}).
  102. -spec plugin_schema_json(name_vsn()) -> {ok, schema_json_map()} | {error, any()}.
  103. plugin_schema_json(NameVsn) ->
  104. read_plugin_avsc(NameVsn).
  105. -spec plugin_i18n_json(name_vsn()) -> {ok, i18n_json_map()} | {error, any()}.
  106. plugin_i18n_json(NameVsn) ->
  107. read_plugin_i18n(NameVsn).
  108. -spec raw_plugin_config_content(name_vsn()) -> {ok, raw_plugin_config_content()} | {error, any()}.
  109. raw_plugin_config_content(NameVsn) ->
  110. read_plugin_hocon(NameVsn).
  111. parse_name_vsn(NameVsn) when is_binary(NameVsn) ->
  112. parse_name_vsn(binary_to_list(NameVsn));
  113. parse_name_vsn(NameVsn) when is_list(NameVsn) ->
  114. case lists:splitwith(fun(X) -> X =/= $- end, NameVsn) of
  115. {AppName, [$- | Vsn]} -> {ok, list_to_atom(AppName), Vsn};
  116. _ -> {error, "bad_name_vsn"}
  117. end.
  118. make_name_vsn_string(Name, Vsn) ->
  119. binary_to_list(iolist_to_binary([Name, "-", Vsn])).
  120. app_dir(AppName, Apps) ->
  121. case
  122. lists:filter(
  123. fun(AppNameVsn) -> nomatch =/= string:prefix(AppNameVsn, AppName) end,
  124. Apps
  125. )
  126. of
  127. [AppNameVsn] ->
  128. {ok, AppNameVsn};
  129. _ ->
  130. {error, not_found}
  131. end.
  132. %%--------------------------------------------------------------------
  133. %% Package operations
  134. %% @doc Start all configured plugins are started.
  135. -spec ensure_installed() -> ok.
  136. ensure_installed() ->
  137. Fun = fun(#{name_vsn := NameVsn}) ->
  138. case ensure_installed(NameVsn) of
  139. ok -> [];
  140. {error, Reason} -> [{NameVsn, Reason}]
  141. end
  142. end,
  143. ok = for_plugins(Fun).
  144. %% @doc Install a .tar.gz package placed in install_dir.
  145. -spec ensure_installed(name_vsn()) -> ok | {error, map()}.
  146. ensure_installed(NameVsn) ->
  147. case read_plugin_info(NameVsn, #{}) of
  148. {ok, _} ->
  149. ok,
  150. _ = maybe_ensure_plugin_config(NameVsn, ?normal);
  151. {error, _} ->
  152. ok = purge(NameVsn),
  153. case ensure_exists_and_installed(NameVsn) of
  154. ok ->
  155. maybe_post_op_after_installed(NameVsn, ?normal),
  156. ok;
  157. {error, _Reason} = Err ->
  158. Err
  159. end
  160. end.
  161. ensure_installed(NameVsn, ?fresh_install = Mode) ->
  162. case ensure_exists_and_installed(NameVsn) of
  163. ok ->
  164. maybe_post_op_after_installed(NameVsn, Mode),
  165. ok;
  166. {error, _Reason} = Err ->
  167. Err
  168. end.
  169. %% @doc Ensure files and directories for the given plugin are being deleted.
  170. %% If a plugin is running, or enabled, an error is returned.
  171. -spec ensure_uninstalled(name_vsn()) -> ok | {error, any()}.
  172. ensure_uninstalled(NameVsn) ->
  173. case read_plugin_info(NameVsn, #{}) of
  174. {ok, #{running_status := RunningSt}} when RunningSt =/= stopped ->
  175. {error, #{
  176. msg => "bad_plugin_running_status",
  177. hint => "stop_the_plugin_first"
  178. }};
  179. {ok, #{config_status := enabled}} ->
  180. {error, #{
  181. msg => "bad_plugin_config_status",
  182. hint => "disable_the_plugin_first"
  183. }};
  184. _ ->
  185. purge(NameVsn),
  186. ensure_delete(NameVsn)
  187. end.
  188. %% @doc Ensure a plugin is enabled to the end of the plugins list.
  189. -spec ensure_enabled(name_vsn()) -> ok | {error, any()}.
  190. ensure_enabled(NameVsn) ->
  191. ensure_enabled(NameVsn, no_move).
  192. %% @doc Ensure a plugin is enabled at the given position of the plugin list.
  193. -spec ensure_enabled(name_vsn(), position()) -> ok | {error, any()}.
  194. ensure_enabled(NameVsn, Position) ->
  195. ensure_state(NameVsn, Position, _Enabled = true, _ConfLocation = local).
  196. -spec ensure_enabled(name_vsn(), position(), local | global) -> ok | {error, any()}.
  197. ensure_enabled(NameVsn, Position, ConfLocation) when
  198. ConfLocation =:= local; ConfLocation =:= global
  199. ->
  200. ensure_state(NameVsn, Position, _Enabled = true, ConfLocation).
  201. %% @doc Ensure a plugin is disabled.
  202. -spec ensure_disabled(name_vsn()) -> ok | {error, any()}.
  203. ensure_disabled(NameVsn) ->
  204. ensure_state(NameVsn, no_move, false, _ConfLocation = local).
  205. %% @doc Delete extracted dir
  206. %% In case one lib is shared by multiple plugins.
  207. %% it might be the case that purging one plugin's install dir
  208. %% will cause deletion of loaded beams.
  209. %% It should not be a problem, because shared lib should
  210. %% reside in all the plugin install dirs.
  211. -spec purge(name_vsn()) -> ok.
  212. purge(NameVsn) ->
  213. _ = maybe_purge_plugin_config(NameVsn),
  214. purge_plugin(NameVsn).
  215. %% @doc Delete the package file.
  216. -spec delete_package(name_vsn()) -> ok.
  217. delete_package(NameVsn) ->
  218. File = pkg_file_path(NameVsn),
  219. _ = emqx_plugins_serde:delete_schema(NameVsn),
  220. case file:delete(File) of
  221. ok ->
  222. ?SLOG(info, #{msg => "purged_plugin_dir", path => File}),
  223. ok;
  224. {error, enoent} ->
  225. ok;
  226. {error, Reason} ->
  227. ?SLOG(error, #{
  228. msg => "failed_to_delete_package_file",
  229. path => File,
  230. reason => Reason
  231. }),
  232. {error, Reason}
  233. end.
  234. %%--------------------------------------------------------------------
  235. %% Plugin runtime management
  236. %% @doc Start all configured plugins are started.
  237. -spec ensure_started() -> ok.
  238. ensure_started() ->
  239. Fun = fun
  240. (#{name_vsn := NameVsn, enable := true}) ->
  241. case do_ensure_started(NameVsn) of
  242. ok -> [];
  243. {error, Reason} -> [{NameVsn, Reason}]
  244. end;
  245. (#{name_vsn := NameVsn, enable := false}) ->
  246. ?SLOG(debug, #{msg => "plugin_disabled", name_vsn => NameVsn}),
  247. []
  248. end,
  249. ok = for_plugins(Fun).
  250. %% @doc Start a plugin from Management API or CLI.
  251. %% the input is a <name>-<vsn> string.
  252. -spec ensure_started(name_vsn()) -> ok | {error, term()}.
  253. ensure_started(NameVsn) ->
  254. case do_ensure_started(NameVsn) of
  255. ok ->
  256. ok;
  257. {error, ReasonMap} ->
  258. ?SLOG(error, ReasonMap#{msg => "failed_to_start_plugin"}),
  259. {error, ReasonMap}
  260. end.
  261. %% @doc Stop all plugins before broker stops.
  262. -spec ensure_stopped() -> ok.
  263. ensure_stopped() ->
  264. Fun = fun
  265. (#{name_vsn := NameVsn, enable := true}) ->
  266. case ensure_stopped(NameVsn) of
  267. ok ->
  268. [];
  269. {error, Reason} ->
  270. [{NameVsn, Reason}]
  271. end;
  272. (#{name_vsn := NameVsn, enable := false}) ->
  273. ?SLOG(debug, #{msg => "plugin_disabled", action => stop_plugin, name_vsn => NameVsn}),
  274. []
  275. end,
  276. ok = for_plugins(Fun).
  277. %% @doc Stop a plugin from Management API or CLI.
  278. -spec ensure_stopped(name_vsn()) -> ok | {error, term()}.
  279. ensure_stopped(NameVsn) ->
  280. tryit(
  281. "stop_plugin",
  282. fun() ->
  283. Plugin = do_read_plugin(NameVsn),
  284. ensure_apps_stopped(Plugin)
  285. end
  286. ).
  287. get_config(Name, Vsn, Opt, Default) ->
  288. get_config(make_name_vsn_string(Name, Vsn), Opt, Default).
  289. -spec get_config(name_vsn()) ->
  290. {ok, plugin_config_map() | any()}
  291. | {error, term()}.
  292. get_config(NameVsn) ->
  293. get_config(NameVsn, ?CONFIG_FORMAT_MAP, #{}).
  294. -spec get_config(name_vsn(), ?CONFIG_FORMAT_MAP | ?CONFIG_FORMAT_BIN) ->
  295. {ok, raw_plugin_config_content() | plugin_config_map() | any()}
  296. | {error, term()}.
  297. get_config(NameVsn, ?CONFIG_FORMAT_MAP) ->
  298. get_config(NameVsn, ?CONFIG_FORMAT_MAP, #{});
  299. get_config(NameVsn, ?CONFIG_FORMAT_BIN) ->
  300. get_config_bin(NameVsn).
  301. %% Present default config value only in map format.
  302. -spec get_config(name_vsn(), ?CONFIG_FORMAT_MAP, any()) ->
  303. {ok, plugin_config_map() | any()}
  304. | {error, term()}.
  305. get_config(NameVsn, ?CONFIG_FORMAT_MAP, Default) ->
  306. {ok, persistent_term:get(?PLUGIN_PERSIS_CONFIG_KEY(bin(NameVsn)), Default)}.
  307. get_config_bin(NameVsn) ->
  308. %% no default value when get raw binary config
  309. case read_plugin_hocon(NameVsn) of
  310. {ok, _Map} = Res -> Res;
  311. {error, _Reason} = Err -> Err
  312. end.
  313. %% @doc Update plugin's config.
  314. %% RPC call from Management API or CLI.
  315. %% The plugin config Json Map was valid by avro schema
  316. %% Or: if no and plugin config ALWAYS be valid before calling this function.
  317. put_config(NameVsn, ConfigJsonMap, AvroValue) when (not is_binary(NameVsn)) ->
  318. put_config(bin(NameVsn), ConfigJsonMap, AvroValue);
  319. put_config(NameVsn, ConfigJsonMap, _AvroValue) ->
  320. HoconBin = hocon_pp:do(ConfigJsonMap, #{}),
  321. ok = backup_and_write_hocon_bin(NameVsn, HoconBin),
  322. %% TODO: callback in plugin's on_config_upgraded (config vsn upgrade v1 -> v2)
  323. ok = maybe_call_on_config_changed(NameVsn, ConfigJsonMap),
  324. ok = persistent_term:put(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn), ConfigJsonMap),
  325. ok.
  326. %% @doc Stop and then start the plugin.
  327. restart(NameVsn) ->
  328. case ensure_stopped(NameVsn) of
  329. ok -> ensure_started(NameVsn);
  330. {error, Reason} -> {error, Reason}
  331. end.
  332. %% @doc Call plugin's callback on_config_changed/2
  333. maybe_call_on_config_changed(NameVsn, NewConf) ->
  334. FuncName = on_config_changed,
  335. maybe
  336. {ok, PluginAppModule} ?= app_module_name(NameVsn),
  337. true ?= erlang:function_exported(PluginAppModule, FuncName, 2),
  338. {ok, OldConf} = get_config(NameVsn),
  339. try erlang:apply(PluginAppModule, FuncName, [OldConf, NewConf]) of
  340. _ -> ok
  341. catch
  342. Class:CatchReason:Stacktrace ->
  343. ?SLOG(error, #{
  344. msg => "failed_to_call_on_config_changed",
  345. exception => Class,
  346. reason => CatchReason,
  347. stacktrace => Stacktrace
  348. }),
  349. ok
  350. end
  351. else
  352. {error, Reason} ->
  353. ?SLOG(info, #{msg => "failed_to_call_on_config_changed", reason => Reason});
  354. false ->
  355. ?SLOG(info, #{msg => "on_config_changed_callback_not_exported"});
  356. _ ->
  357. ok
  358. end.
  359. app_module_name(NameVsn) ->
  360. case read_plugin_info(NameVsn, #{}) of
  361. {ok, #{<<"name">> := Name} = _PluginInfo} ->
  362. emqx_utils:safe_to_existing_atom(<<Name/binary, "_app">>);
  363. {error, Reason} ->
  364. ?SLOG(error, Reason#{msg => "failed_to_read_plugin_info"}),
  365. {error, Reason}
  366. end.
  367. %% @doc List all installed plugins.
  368. %% Including the ones that are installed, but not enabled in config.
  369. -spec list() -> [plugin_info()].
  370. list() ->
  371. list(normal).
  372. -spec list(all | normal | hidden) -> [plugin_info()].
  373. list(Type) ->
  374. Pattern = filename:join([install_dir(), "*", "release.json"]),
  375. All = lists:filtermap(
  376. fun(JsonFilePath) ->
  377. [_, NameVsn | _] = lists:reverse(filename:split(JsonFilePath)),
  378. case read_plugin_info(NameVsn, #{}) of
  379. {ok, Info} ->
  380. filter_plugin_of_type(Type, Info);
  381. {error, Reason} ->
  382. ?SLOG(warning, Reason#{msg => "failed_to_read_plugin_info"}),
  383. false
  384. end
  385. end,
  386. filelib:wildcard(Pattern)
  387. ),
  388. do_list(configured(), All).
  389. filter_plugin_of_type(all, Info) ->
  390. {true, Info};
  391. filter_plugin_of_type(normal, #{<<"hidden">> := true}) ->
  392. false;
  393. filter_plugin_of_type(normal, Info) ->
  394. {true, Info};
  395. filter_plugin_of_type(hidden, #{<<"hidden">> := true} = Info) ->
  396. {true, Info};
  397. filter_plugin_of_type(hidden, _Info) ->
  398. false.
  399. %%--------------------------------------------------------------------
  400. %% Package utils
  401. -spec decode_plugin_config_map(name_vsn(), map() | binary()) ->
  402. {ok, map() | ?plugin_without_config_schema}
  403. | {error, any()}.
  404. decode_plugin_config_map(NameVsn, AvroJsonMap) ->
  405. case with_plugin_avsc(NameVsn) of
  406. true ->
  407. case emqx_plugins_serde:lookup_serde(NameVsn) of
  408. {error, not_found} ->
  409. Reason = "plugin_config_schema_serde_not_found",
  410. ?SLOG(error, #{
  411. msg => Reason, name_vsn => NameVsn, plugin_with_avro_schema => true
  412. }),
  413. {error, Reason};
  414. {ok, _Serde} ->
  415. do_decode_plugin_config_map(NameVsn, AvroJsonMap)
  416. end;
  417. false ->
  418. ?SLOG(debug, #{
  419. msg => "plugin_without_config_schema",
  420. name_vsn => NameVsn
  421. }),
  422. {ok, ?plugin_without_config_schema}
  423. end.
  424. do_decode_plugin_config_map(NameVsn, AvroJsonMap) when is_map(AvroJsonMap) ->
  425. do_decode_plugin_config_map(NameVsn, emqx_utils_json:encode(AvroJsonMap));
  426. do_decode_plugin_config_map(NameVsn, AvroJsonBin) ->
  427. case emqx_plugins_serde:decode(NameVsn, AvroJsonBin) of
  428. {ok, Config} -> {ok, Config};
  429. {error, ReasonMap} -> {error, ReasonMap}
  430. end.
  431. -spec with_plugin_avsc(name_vsn()) -> boolean().
  432. with_plugin_avsc(NameVsn) ->
  433. case read_plugin_info(NameVsn, #{fill_readme => false}) of
  434. {ok, #{<<"with_config_schema">> := WithAvsc}} when is_boolean(WithAvsc) ->
  435. WithAvsc;
  436. _ ->
  437. false
  438. end.
  439. get_config_interal(Key, Default) when is_atom(Key) ->
  440. get_config_interal([Key], Default);
  441. get_config_interal(Path, Default) ->
  442. emqx_conf:get([?CONF_ROOT | Path], Default).
  443. put_config_internal(Key, Value) ->
  444. do_put_config_internal(Key, Value, _ConfLocation = local).
  445. -spec get_tar(name_vsn()) -> {ok, binary()} | {error, any}.
  446. get_tar(NameVsn) ->
  447. TarGz = pkg_file_path(NameVsn),
  448. case file:read_file(TarGz) of
  449. {ok, Content} ->
  450. {ok, Content};
  451. {error, _} ->
  452. case maybe_create_tar(NameVsn, TarGz, install_dir()) of
  453. ok ->
  454. file:read_file(TarGz);
  455. Err ->
  456. Err
  457. end
  458. end.
  459. ensure_ssl_files(NameVsn, SSL) ->
  460. emqx_tls_lib:ensure_ssl_files(plugin_certs_dir(NameVsn), SSL).
  461. ensure_ssl_files(NameVsn, SSL, Opts) ->
  462. emqx_tls_lib:ensure_ssl_files(plugin_certs_dir(NameVsn), SSL, Opts).
  463. %%--------------------------------------------------------------------
  464. %% Internal
  465. %%--------------------------------------------------------------------
  466. maybe_create_tar(NameVsn, TarGzName, InstallDir) when is_binary(InstallDir) ->
  467. maybe_create_tar(NameVsn, TarGzName, binary_to_list(InstallDir));
  468. maybe_create_tar(NameVsn, TarGzName, InstallDir) ->
  469. case filelib:wildcard(filename:join(plugin_dir(NameVsn), "**")) of
  470. [_ | _] = PluginFiles ->
  471. InstallDir1 = string:trim(InstallDir, trailing, "/") ++ "/",
  472. PluginFiles1 = [{string:prefix(F, InstallDir1), F} || F <- PluginFiles],
  473. erl_tar:create(TarGzName, PluginFiles1, [compressed]);
  474. _ ->
  475. {error, plugin_not_found}
  476. end.
  477. write_tar_file_content(BaseDir, TarContent) ->
  478. lists:foreach(
  479. fun({Name, Bin}) ->
  480. Filename = filename:join(BaseDir, Name),
  481. ok = filelib:ensure_dir(Filename),
  482. ok = file:write_file(Filename, Bin)
  483. end,
  484. TarContent
  485. ).
  486. delete_tar_file_content(BaseDir, TarContent) ->
  487. lists:foreach(
  488. fun({Name, _}) ->
  489. Filename = filename:join(BaseDir, Name),
  490. case filelib:is_file(Filename) of
  491. true ->
  492. TopDirOrFile = top_dir(BaseDir, Filename),
  493. ok = file:del_dir_r(TopDirOrFile);
  494. false ->
  495. %% probably already deleted
  496. ok
  497. end
  498. end,
  499. TarContent
  500. ).
  501. top_dir(BaseDir0, DirOrFile) ->
  502. BaseDir = normalize_dir(BaseDir0),
  503. case filename:dirname(DirOrFile) of
  504. RockBottom when RockBottom =:= "/" orelse RockBottom =:= "." ->
  505. throw({out_of_bounds, DirOrFile});
  506. BaseDir ->
  507. DirOrFile;
  508. Parent ->
  509. top_dir(BaseDir, Parent)
  510. end.
  511. normalize_dir(Dir) ->
  512. %% Get rid of possible trailing slash
  513. filename:join([Dir, ""]).
  514. -ifdef(TEST).
  515. normalize_dir_test_() ->
  516. [
  517. ?_assertEqual("foo", normalize_dir("foo")),
  518. ?_assertEqual("foo", normalize_dir("foo/")),
  519. ?_assertEqual("/foo", normalize_dir("/foo")),
  520. ?_assertEqual("/foo", normalize_dir("/foo/"))
  521. ].
  522. top_dir_test_() ->
  523. [
  524. ?_assertEqual("base/foo", top_dir("base", filename:join(["base", "foo", "bar"]))),
  525. ?_assertEqual("/base/foo", top_dir("/base", filename:join(["/", "base", "foo", "bar"]))),
  526. ?_assertEqual("/base/foo", top_dir("/base/", filename:join(["/", "base", "foo", "bar"]))),
  527. ?_assertThrow({out_of_bounds, _}, top_dir("/base", filename:join(["/", "base"]))),
  528. ?_assertThrow({out_of_bounds, _}, top_dir("/base", filename:join(["/", "foo", "bar"])))
  529. ].
  530. -endif.
  531. do_ensure_installed(NameVsn) ->
  532. TarGz = pkg_file_path(NameVsn),
  533. case erl_tar:extract(TarGz, [compressed, memory]) of
  534. {ok, TarContent} ->
  535. ok = write_tar_file_content(install_dir(), TarContent),
  536. case read_plugin_info(NameVsn, #{}) of
  537. {ok, _} ->
  538. ok;
  539. {error, Reason} ->
  540. ?SLOG(warning, Reason#{msg => "failed_to_read_after_install"}),
  541. ok = delete_tar_file_content(install_dir(), TarContent),
  542. {error, Reason}
  543. end;
  544. {error, {_, enoent}} ->
  545. {error, #{
  546. msg => "failed_to_extract_plugin_package",
  547. path => TarGz,
  548. reason => plugin_tarball_not_found
  549. }};
  550. {error, Reason} ->
  551. {error, #{
  552. msg => "bad_plugin_package",
  553. path => TarGz,
  554. reason => Reason
  555. }}
  556. end.
  557. ensure_delete(NameVsn0) ->
  558. NameVsn = bin(NameVsn0),
  559. List = configured(),
  560. put_configured(lists:filter(fun(#{name_vsn := N1}) -> bin(N1) =/= NameVsn end, List)),
  561. ok.
  562. ensure_state(NameVsn, Position, State, ConfLocation) when is_binary(NameVsn) ->
  563. ensure_state(binary_to_list(NameVsn), Position, State, ConfLocation);
  564. ensure_state(NameVsn, Position, State, ConfLocation) ->
  565. case read_plugin_info(NameVsn, #{}) of
  566. {ok, _} ->
  567. Item = #{
  568. name_vsn => NameVsn,
  569. enable => State
  570. },
  571. tryit(
  572. "ensure_state",
  573. fun() -> ensure_configured(Item, Position, ConfLocation) end
  574. );
  575. {error, Reason} ->
  576. ?SLOG(error, #{msg => "ensure_plugin_states_failed", reason => Reason}),
  577. {error, Reason}
  578. end.
  579. ensure_configured(#{name_vsn := NameVsn} = Item, Position, ConfLocation) ->
  580. Configured = configured(),
  581. SplitFun = fun(#{name_vsn := Nv}) -> bin(Nv) =/= bin(NameVsn) end,
  582. {Front, Rear} = lists:splitwith(SplitFun, Configured),
  583. NewConfigured =
  584. case Rear of
  585. [_ | More] when Position =:= no_move ->
  586. Front ++ [Item | More];
  587. [_ | More] ->
  588. add_new_configured(Front ++ More, Position, Item);
  589. [] ->
  590. add_new_configured(Configured, Position, Item)
  591. end,
  592. ok = put_configured(NewConfigured, ConfLocation).
  593. add_new_configured(Configured, no_move, Item) ->
  594. %% default to rear
  595. add_new_configured(Configured, rear, Item);
  596. add_new_configured(Configured, front, Item) ->
  597. [Item | Configured];
  598. add_new_configured(Configured, rear, Item) ->
  599. Configured ++ [Item];
  600. add_new_configured(Configured, {Action, NameVsn}, Item) ->
  601. SplitFun = fun(#{name_vsn := Nv}) -> bin(Nv) =/= bin(NameVsn) end,
  602. {Front, Rear} = lists:splitwith(SplitFun, Configured),
  603. Rear =:= [] andalso
  604. throw(#{
  605. msg => "position_anchor_plugin_not_configured",
  606. hint => "maybe_install_and_configure",
  607. name_vsn => NameVsn
  608. }),
  609. case Action of
  610. before ->
  611. Front ++ [Item | Rear];
  612. behind ->
  613. [Anchor | Rear0] = Rear,
  614. Front ++ [Anchor, Item | Rear0]
  615. end.
  616. maybe_purge_plugin_config(NameVsn) ->
  617. _ = persistent_term:erase(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn)),
  618. ok.
  619. purge_plugin(NameVsn) ->
  620. Dir = plugin_dir(NameVsn),
  621. purge_plugin_dir(Dir).
  622. purge_plugin_dir(Dir) ->
  623. case file:del_dir_r(Dir) of
  624. ok ->
  625. ?SLOG(info, #{
  626. msg => "purged_plugin_dir",
  627. dir => Dir
  628. });
  629. {error, enoent} ->
  630. ok;
  631. {error, Reason} ->
  632. ?SLOG(error, #{
  633. msg => "failed_to_purge_plugin_dir",
  634. dir => Dir,
  635. reason => Reason
  636. }),
  637. {error, Reason}
  638. end.
  639. %% Make sure configured ones are ordered in front.
  640. do_list([], All) ->
  641. All;
  642. do_list([#{name_vsn := NameVsn} | Rest], All) ->
  643. SplitF = fun(#{<<"name">> := Name, <<"rel_vsn">> := Vsn}) ->
  644. bin([Name, "-", Vsn]) =/= bin(NameVsn)
  645. end,
  646. case lists:splitwith(SplitF, All) of
  647. {_, []} ->
  648. do_list(Rest, All);
  649. {Front, [I | Rear]} ->
  650. [I | do_list(Rest, Front ++ Rear)]
  651. end.
  652. do_ensure_started(NameVsn) ->
  653. tryit(
  654. "start_plugins",
  655. fun() ->
  656. case ensure_exists_and_installed(NameVsn) of
  657. ok ->
  658. Plugin = do_read_plugin(NameVsn),
  659. ok = load_code_start_apps(NameVsn, Plugin);
  660. {error, #{reason := Reason} = ReasonMap} ->
  661. ?SLOG(error, #{
  662. msg => "failed_to_start_plugin",
  663. name_vsn => NameVsn,
  664. reason => Reason
  665. }),
  666. {error, ReasonMap}
  667. end
  668. end
  669. ).
  670. %%--------------------------------------------------------------------
  671. %% try the function, catch 'throw' exceptions as normal 'error' return
  672. %% other exceptions with stacktrace logged.
  673. tryit(WhichOp, F) ->
  674. try
  675. F()
  676. catch
  677. throw:ReasonMap when is_map(ReasonMap) ->
  678. %% thrown exceptions are known errors
  679. %% translate to a return value without stacktrace
  680. {error, ReasonMap};
  681. throw:Reason ->
  682. {error, #{reason => Reason}};
  683. error:Reason:Stacktrace ->
  684. %% unexpected errors, log stacktrace
  685. ?SLOG(warning, #{
  686. msg => "plugin_op_failed",
  687. which_op => WhichOp,
  688. exception => Reason,
  689. stacktrace => Stacktrace
  690. }),
  691. {error, #{
  692. which_op => WhichOp,
  693. exception => Reason,
  694. stacktrace => Stacktrace
  695. }}
  696. end.
  697. %% read plugin info from the JSON file
  698. %% returns {ok, Info} or {error, Reason}
  699. read_plugin_info(NameVsn, Options) ->
  700. tryit(
  701. atom_to_list(?FUNCTION_NAME),
  702. fun() -> {ok, do_read_plugin(NameVsn, Options)} end
  703. ).
  704. do_read_plugin(NameVsn) ->
  705. do_read_plugin(NameVsn, #{}).
  706. do_read_plugin(NameVsn, Option) ->
  707. do_read_plugin(NameVsn, info_file_path(NameVsn), Option).
  708. do_read_plugin(NameVsn, InfoFilePath, Options) ->
  709. {ok, PlainMap} = (read_file_fun(InfoFilePath, "bad_info_file", #{read_mode => ?JSON_MAP}))(),
  710. Info0 = check_plugin(PlainMap, NameVsn, InfoFilePath),
  711. Info1 = plugins_readme(NameVsn, Options, Info0),
  712. Info2 = plugins_package_info(NameVsn, Info1),
  713. plugin_status(NameVsn, Info2).
  714. read_plugin_avsc(NameVsn) ->
  715. read_plugin_avsc(NameVsn, #{read_mode => ?JSON_MAP}).
  716. read_plugin_avsc(NameVsn, Options) ->
  717. tryit(
  718. atom_to_list(?FUNCTION_NAME),
  719. read_file_fun(avsc_file_path(NameVsn), "bad_avsc_file", Options)
  720. ).
  721. read_plugin_i18n(NameVsn) ->
  722. read_plugin_i18n(NameVsn, #{read_mode => ?JSON_MAP}).
  723. read_plugin_i18n(NameVsn, Options) ->
  724. tryit(
  725. atom_to_list(?FUNCTION_NAME),
  726. read_file_fun(i18n_file_path(NameVsn), "bad_i18n_file", Options)
  727. ).
  728. read_plugin_hocon(NameVsn) ->
  729. read_plugin_hocon(NameVsn, #{read_mode => ?RAW_BIN}).
  730. read_plugin_hocon(NameVsn, Options) ->
  731. tryit(
  732. atom_to_list(?FUNCTION_NAME),
  733. read_file_fun(plugin_config_file(NameVsn), "bad_hocon_file", Options)
  734. ).
  735. ensure_exists_and_installed(NameVsn) ->
  736. case filelib:is_dir(plugin_dir(NameVsn)) of
  737. true ->
  738. ok;
  739. false ->
  740. %% Do we have the package, but it's not extracted yet?
  741. case get_tar(NameVsn) of
  742. {ok, TarContent} ->
  743. ok = file:write_file(pkg_file_path(NameVsn), TarContent),
  744. do_ensure_installed(NameVsn);
  745. _ ->
  746. %% If not, try to get it from the cluster.
  747. do_get_from_cluster(NameVsn)
  748. end
  749. end.
  750. do_get_from_cluster(NameVsn) ->
  751. Nodes = [N || N <- mria:running_nodes(), N /= node()],
  752. case get_plugin_tar_from_any_node(Nodes, NameVsn, []) of
  753. {ok, TarContent} ->
  754. ok = file:write_file(pkg_file_path(NameVsn), TarContent),
  755. ok = do_ensure_installed(NameVsn);
  756. {error, NodeErrors} when Nodes =/= [] ->
  757. ErrMeta = #{
  758. msg => "failed_to_copy_plugin_from_other_nodes",
  759. name_vsn => NameVsn,
  760. node_errors => NodeErrors,
  761. reason => plugin_not_found
  762. },
  763. ?SLOG(error, ErrMeta),
  764. {error, ErrMeta};
  765. {error, _} ->
  766. ErrMeta = #{
  767. msg => "no_nodes_to_copy_plugin_from",
  768. name_vsn => NameVsn,
  769. reason => plugin_not_found
  770. },
  771. ?SLOG(error, ErrMeta),
  772. {error, ErrMeta}
  773. end.
  774. get_plugin_tar_from_any_node([], _NameVsn, Errors) ->
  775. {error, Errors};
  776. get_plugin_tar_from_any_node([Node | T], NameVsn, Errors) ->
  777. case emqx_plugins_proto_v1:get_tar(Node, NameVsn, infinity) of
  778. {ok, _} = Res ->
  779. ?SLOG(debug, #{
  780. msg => "get_plugin_tar_from_cluster_successfully",
  781. node => Node,
  782. name_vsn => NameVsn
  783. }),
  784. Res;
  785. Err ->
  786. get_plugin_tar_from_any_node(T, NameVsn, [{Node, Err} | Errors])
  787. end.
  788. get_plugin_config_from_any_node([], _NameVsn, Errors) ->
  789. {error, Errors};
  790. get_plugin_config_from_any_node([Node | T], NameVsn, Errors) ->
  791. case
  792. emqx_plugins_proto_v2:get_config(
  793. Node, NameVsn, ?CONFIG_FORMAT_MAP, ?plugin_conf_not_found, 5_000
  794. )
  795. of
  796. {ok, _} = Res ->
  797. ?SLOG(debug, #{
  798. msg => "get_plugin_config_from_cluster_successfully",
  799. node => Node,
  800. name_vsn => NameVsn
  801. }),
  802. Res;
  803. Err ->
  804. get_plugin_config_from_any_node(T, NameVsn, [{Node, Err} | Errors])
  805. end.
  806. plugins_package_info(NameVsn, Info) ->
  807. case file:read_file(md5sum_file(NameVsn)) of
  808. {ok, MD5} -> Info#{md5sum => MD5};
  809. _ -> Info#{md5sum => <<>>}
  810. end.
  811. plugins_readme(NameVsn, #{fill_readme := true}, Info) ->
  812. case file:read_file(readme_file(NameVsn)) of
  813. {ok, Bin} -> Info#{readme => Bin};
  814. _ -> Info#{readme => <<>>}
  815. end;
  816. plugins_readme(_NameVsn, _Options, Info) ->
  817. Info.
  818. plugin_status(NameVsn, Info) ->
  819. {ok, AppName, _AppVsn} = parse_name_vsn(NameVsn),
  820. RunningSt =
  821. case application:get_key(AppName, vsn) of
  822. {ok, _} ->
  823. case lists:keyfind(AppName, 1, running_apps()) of
  824. {AppName, _} -> running;
  825. _ -> loaded
  826. end;
  827. undefined ->
  828. stopped
  829. end,
  830. Configured = lists:filtermap(
  831. fun(#{name_vsn := Nv, enable := St}) ->
  832. case bin(Nv) =:= bin(NameVsn) of
  833. true -> {true, St};
  834. false -> false
  835. end
  836. end,
  837. configured()
  838. ),
  839. ConfSt =
  840. case Configured of
  841. [] -> not_configured;
  842. [true] -> enabled;
  843. [false] -> disabled
  844. end,
  845. Info#{
  846. running_status => RunningSt,
  847. config_status => ConfSt
  848. }.
  849. check_plugin(
  850. #{
  851. <<"name">> := Name,
  852. <<"rel_vsn">> := Vsn,
  853. <<"rel_apps">> := Apps,
  854. <<"description">> := _
  855. } = Info,
  856. NameVsn,
  857. FilePath
  858. ) ->
  859. case bin(NameVsn) =:= bin([Name, "-", Vsn]) of
  860. true ->
  861. try
  862. %% assert
  863. [_ | _] = Apps,
  864. %% validate if the list is all <app>-<vsn> strings
  865. lists:foreach(fun(App) -> {ok, _, _} = parse_name_vsn(App) end, Apps)
  866. catch
  867. _:_ ->
  868. throw(#{
  869. msg => "bad_rel_apps",
  870. rel_apps => Apps,
  871. hint => "A non-empty string list of app_name-app_vsn format"
  872. })
  873. end,
  874. Info;
  875. false ->
  876. throw(#{
  877. msg => "name_vsn_mismatch",
  878. name_vsn => NameVsn,
  879. path => FilePath,
  880. name => Name,
  881. rel_vsn => Vsn
  882. })
  883. end;
  884. check_plugin(_What, NameVsn, File) ->
  885. throw(#{
  886. msg => "bad_info_file_content",
  887. mandatory_fields => [rel_vsn, name, rel_apps, description],
  888. name_vsn => NameVsn,
  889. path => File
  890. }).
  891. load_code_start_apps(RelNameVsn, #{<<"rel_apps">> := Apps}) ->
  892. LibDir = filename:join([install_dir(), RelNameVsn]),
  893. RunningApps = running_apps(),
  894. %% load plugin apps and beam code
  895. AppNames =
  896. lists:map(
  897. fun(AppNameVsn) ->
  898. {ok, AppName, AppVsn} = parse_name_vsn(AppNameVsn),
  899. EbinDir = filename:join([LibDir, AppNameVsn, "ebin"]),
  900. ok = load_plugin_app(AppName, AppVsn, EbinDir, RunningApps),
  901. AppName
  902. end,
  903. Apps
  904. ),
  905. lists:foreach(fun start_app/1, AppNames).
  906. load_plugin_app(AppName, AppVsn, Ebin, RunningApps) ->
  907. case lists:keyfind(AppName, 1, RunningApps) of
  908. false ->
  909. do_load_plugin_app(AppName, Ebin);
  910. {_, Vsn} ->
  911. case bin(Vsn) =:= bin(AppVsn) of
  912. true ->
  913. %% already started on the exact version
  914. ok;
  915. false ->
  916. %% running but a different version
  917. ?SLOG(warning, #{
  918. msg => "plugin_app_already_running",
  919. name => AppName,
  920. running_vsn => Vsn,
  921. loading_vsn => AppVsn
  922. })
  923. end
  924. end.
  925. do_load_plugin_app(AppName, Ebin) when is_binary(Ebin) ->
  926. do_load_plugin_app(AppName, binary_to_list(Ebin));
  927. do_load_plugin_app(AppName, Ebin) ->
  928. _ = code:add_patha(Ebin),
  929. Modules = filelib:wildcard(filename:join([Ebin, "*.beam"])),
  930. lists:foreach(
  931. fun(BeamFile) ->
  932. Module = list_to_atom(filename:basename(BeamFile, ".beam")),
  933. _ = code:purge(Module),
  934. case code:load_file(Module) of
  935. {module, _} ->
  936. ok;
  937. {error, Reason} ->
  938. throw(#{
  939. msg => "failed_to_load_plugin_beam",
  940. path => BeamFile,
  941. reason => Reason
  942. })
  943. end
  944. end,
  945. Modules
  946. ),
  947. case application:load(AppName) of
  948. ok ->
  949. ok;
  950. {error, {already_loaded, _}} ->
  951. ok;
  952. {error, Reason} ->
  953. throw(#{
  954. msg => "failed_to_load_plugin_app",
  955. name => AppName,
  956. reason => Reason
  957. })
  958. end.
  959. start_app(App) ->
  960. case run_with_timeout(application, ensure_all_started, [App], 10_000) of
  961. {ok, {ok, Started}} ->
  962. case Started =/= [] of
  963. true -> ?SLOG(debug, #{msg => "started_plugin_apps", apps => Started});
  964. false -> ok
  965. end;
  966. {ok, {error, Reason}} ->
  967. throw(#{
  968. msg => "failed_to_start_app",
  969. app => App,
  970. reason => Reason
  971. });
  972. {error, Reason} ->
  973. throw(#{
  974. msg => "failed_to_start_plugin_app",
  975. app => App,
  976. reason => Reason
  977. })
  978. end.
  979. %% Stop all apps installed by the plugin package,
  980. %% but not the ones shared with others.
  981. ensure_apps_stopped(#{<<"rel_apps">> := Apps}) ->
  982. %% load plugin apps and beam code
  983. AppsToStop = lists:filtermap(fun parse_name_vsn_for_stopping/1, Apps),
  984. case tryit("stop_apps", fun() -> stop_apps(AppsToStop) end) of
  985. {ok, []} ->
  986. %% all apps stopped
  987. ok;
  988. {ok, Left} ->
  989. ?SLOG(warning, #{
  990. msg => "unabled_to_stop_plugin_apps",
  991. apps => Left,
  992. reason => "running_apps_still_depends_on_this_apps"
  993. }),
  994. ok;
  995. {error, Reason} ->
  996. {error, Reason}
  997. end.
  998. %% On one hand, Elixir plugins might include Elixir itself, when targetting a non-Elixir
  999. %% EMQX release. If, on the other hand, the EMQX release already includes Elixir, we
  1000. %% shouldn't stop Elixir nor IEx.
  1001. -ifdef(EMQX_ELIXIR).
  1002. is_protected_app(elixir) -> true;
  1003. is_protected_app(iex) -> true;
  1004. is_protected_app(_) -> false.
  1005. parse_name_vsn_for_stopping(NameVsn) ->
  1006. {ok, AppName, _AppVsn} = parse_name_vsn(NameVsn),
  1007. case is_protected_app(AppName) of
  1008. true ->
  1009. false;
  1010. false ->
  1011. {true, AppName}
  1012. end.
  1013. %% ELSE ifdef(EMQX_ELIXIR)
  1014. -else.
  1015. parse_name_vsn_for_stopping(NameVsn) ->
  1016. {ok, AppName, _AppVsn} = parse_name_vsn(NameVsn),
  1017. {true, AppName}.
  1018. %% END ifdef(EMQX_ELIXIR)
  1019. -endif.
  1020. stop_apps(Apps) ->
  1021. RunningApps = running_apps(),
  1022. case do_stop_apps(Apps, [], RunningApps) of
  1023. %% all stopped
  1024. {ok, []} -> {ok, []};
  1025. %% no progress
  1026. {ok, Remain} when Remain =:= Apps -> {ok, Apps};
  1027. %% try again
  1028. {ok, Remain} -> stop_apps(Remain)
  1029. end.
  1030. do_stop_apps([], Remain, _AllApps) ->
  1031. {ok, lists:reverse(Remain)};
  1032. do_stop_apps([App | Apps], Remain, RunningApps) ->
  1033. case is_needed_by_any(App, RunningApps) of
  1034. true ->
  1035. do_stop_apps(Apps, [App | Remain], RunningApps);
  1036. false ->
  1037. ok = stop_app(App),
  1038. do_stop_apps(Apps, Remain, RunningApps)
  1039. end.
  1040. stop_app(App) ->
  1041. case application:stop(App) of
  1042. ok ->
  1043. ?SLOG(debug, #{msg => "stop_plugin_successfully", app => App}),
  1044. ok = unload_module_and_app(App);
  1045. {error, {not_started, App}} ->
  1046. ?SLOG(debug, #{msg => "plugin_not_started", app => App}),
  1047. ok = unload_module_and_app(App);
  1048. {error, Reason} ->
  1049. throw(#{msg => "failed_to_stop_app", app => App, reason => Reason})
  1050. end.
  1051. unload_module_and_app(App) ->
  1052. case application:get_key(App, modules) of
  1053. {ok, Modules} ->
  1054. lists:foreach(fun code:soft_purge/1, Modules);
  1055. _ ->
  1056. ok
  1057. end,
  1058. _ = application:unload(App),
  1059. ok.
  1060. is_needed_by_any(AppToStop, RunningApps) ->
  1061. lists:any(
  1062. fun({RunningApp, _RunningAppVsn}) ->
  1063. is_needed_by(AppToStop, RunningApp)
  1064. end,
  1065. RunningApps
  1066. ).
  1067. is_needed_by(AppToStop, AppToStop) ->
  1068. false;
  1069. is_needed_by(AppToStop, RunningApp) ->
  1070. case application:get_key(RunningApp, applications) of
  1071. {ok, Deps} -> lists:member(AppToStop, Deps);
  1072. undefined -> false
  1073. end.
  1074. do_put_config_internal(Key, Value, ConfLocation) when is_atom(Key) ->
  1075. do_put_config_internal([Key], Value, ConfLocation);
  1076. do_put_config_internal(Path, Values, _ConfLocation = local) when is_list(Path) ->
  1077. Opts = #{rawconf_with_defaults => true, override_to => cluster},
  1078. %% Already in cluster_rpc, don't use emqx_conf:update, dead calls
  1079. case emqx:update_config([?CONF_ROOT | Path], bin_key(Values), Opts) of
  1080. {ok, _} -> ok;
  1081. Error -> Error
  1082. end;
  1083. do_put_config_internal(Path, Values, _ConfLocation = global) when is_list(Path) ->
  1084. Opts = #{rawconf_with_defaults => true, override_to => cluster},
  1085. case emqx_conf:update([?CONF_ROOT | Path], bin_key(Values), Opts) of
  1086. {ok, _} -> ok;
  1087. Error -> Error
  1088. end.
  1089. %%--------------------------------------------------------------------
  1090. %% `emqx_config_handler' API
  1091. %%--------------------------------------------------------------------
  1092. post_config_update([?CONF_ROOT], _Req, #{states := NewStates}, #{states := OldStates}, _Envs) ->
  1093. NewStatesIndex = maps:from_list([{NV, S} || S = #{name_vsn := NV} <- NewStates]),
  1094. OldStatesIndex = maps:from_list([{NV, S} || S = #{name_vsn := NV} <- OldStates]),
  1095. #{changed := Changed} = emqx_utils_maps:diff_maps(NewStatesIndex, OldStatesIndex),
  1096. maps:foreach(fun enable_disable_plugin/2, Changed),
  1097. ok;
  1098. post_config_update(_Path, _Req, _NewConf, _OldConf, _Envs) ->
  1099. ok.
  1100. enable_disable_plugin(NameVsn, {#{enable := true}, #{enable := false}}) ->
  1101. %% errors are already logged in this fn
  1102. _ = ensure_stopped(NameVsn),
  1103. ok;
  1104. enable_disable_plugin(NameVsn, {#{enable := false}, #{enable := true}}) ->
  1105. %% errors are already logged in this fn
  1106. _ = ensure_started(NameVsn),
  1107. ok;
  1108. enable_disable_plugin(_NameVsn, _Diff) ->
  1109. ok.
  1110. %%--------------------------------------------------------------------
  1111. %% Helper functions
  1112. %%--------------------------------------------------------------------
  1113. install_dir() ->
  1114. get_config_interal(install_dir, "").
  1115. put_configured(Configured) ->
  1116. put_configured(Configured, _ConfLocation = local).
  1117. put_configured(Configured, ConfLocation) ->
  1118. ok = do_put_config_internal(states, bin_key(Configured), ConfLocation).
  1119. configured() ->
  1120. get_config_interal(states, []).
  1121. for_plugins(ActionFun) ->
  1122. case lists:flatmap(ActionFun, configured()) of
  1123. [] ->
  1124. ok;
  1125. Errors ->
  1126. ErrMeta = #{function => ActionFun, errors => Errors},
  1127. ?tp(
  1128. for_plugins_action_error_occurred,
  1129. ErrMeta
  1130. ),
  1131. ?SLOG(error, ErrMeta#{msg => "for_plugins_action_error_occurred"}),
  1132. ok
  1133. end.
  1134. maybe_post_op_after_installed(NameVsn0, Mode) ->
  1135. NameVsn = wrap_to_list(NameVsn0),
  1136. _ = maybe_load_config_schema(NameVsn, Mode),
  1137. ok = maybe_ensure_state(NameVsn).
  1138. maybe_ensure_state(NameVsn) ->
  1139. EnsureStateFun = fun(#{name_vsn := NV, enable := Bool}, AccIn) ->
  1140. case NV of
  1141. NameVsn ->
  1142. %% Configured, using existed cluster config
  1143. _ = ensure_state(NV, no_move, Bool, global),
  1144. AccIn#{ensured => true};
  1145. _ ->
  1146. AccIn
  1147. end
  1148. end,
  1149. case lists:foldl(EnsureStateFun, #{ensured => false}, configured()) of
  1150. #{ensured := true} ->
  1151. ok;
  1152. #{ensured := false} ->
  1153. ?SLOG(info, #{msg => "plugin_not_configured", name_vsn => NameVsn}),
  1154. %% Clean installation, no config, ensure with `Enable = false`
  1155. _ = ensure_state(NameVsn, no_move, false, global),
  1156. ok
  1157. end,
  1158. ok.
  1159. maybe_load_config_schema(NameVsn, Mode) ->
  1160. AvscPath = avsc_file_path(NameVsn),
  1161. _ =
  1162. with_plugin_avsc(NameVsn) andalso
  1163. filelib:is_regular(AvscPath) andalso
  1164. do_load_config_schema(NameVsn, AvscPath),
  1165. _ = maybe_create_config_dir(NameVsn, Mode).
  1166. do_load_config_schema(NameVsn, AvscPath) ->
  1167. case emqx_plugins_serde:add_schema(bin(NameVsn), AvscPath) of
  1168. ok -> ok;
  1169. {error, already_exists} -> ok;
  1170. {error, _Reason} -> ok
  1171. end.
  1172. maybe_create_config_dir(NameVsn, Mode) ->
  1173. with_plugin_avsc(NameVsn) andalso
  1174. do_create_config_dir(NameVsn, Mode).
  1175. do_create_config_dir(NameVsn, Mode) ->
  1176. case plugin_data_dir(NameVsn) of
  1177. {error, Reason} ->
  1178. {error, {gen_config_dir_failed, Reason}};
  1179. ConfigDir ->
  1180. case filelib:ensure_path(ConfigDir) of
  1181. ok ->
  1182. %% get config from other nodes or get from tarball
  1183. _ = maybe_ensure_plugin_config(NameVsn, Mode),
  1184. ok;
  1185. {error, Reason} ->
  1186. ?SLOG(warning, #{
  1187. msg => "failed_to_create_plugin_config_dir",
  1188. dir => ConfigDir,
  1189. reason => Reason
  1190. }),
  1191. {error, {mkdir_failed, ConfigDir, Reason}}
  1192. end
  1193. end.
  1194. -spec maybe_ensure_plugin_config(name_vsn(), ?fresh_install | ?normal) -> ok.
  1195. maybe_ensure_plugin_config(NameVsn, Mode) ->
  1196. maybe
  1197. true ?= with_plugin_avsc(NameVsn),
  1198. _ = ensure_plugin_config({NameVsn, Mode})
  1199. else
  1200. _ -> ok
  1201. end.
  1202. -spec ensure_plugin_config({name_vsn(), ?fresh_install | ?normal}) -> ok.
  1203. ensure_plugin_config({NameVsn, ?normal}) ->
  1204. ensure_plugin_config(NameVsn, [N || N <- mria:running_nodes(), N /= node()]);
  1205. ensure_plugin_config({NameVsn, ?fresh_install}) ->
  1206. ?SLOG(debug, #{
  1207. msg => "default_plugin_config_used",
  1208. name_vsn => NameVsn,
  1209. hint => "fresh_install"
  1210. }),
  1211. cp_default_config_file(NameVsn).
  1212. -spec ensure_plugin_config(name_vsn(), list()) -> ok.
  1213. ensure_plugin_config(NameVsn, []) ->
  1214. ?SLOG(debug, #{
  1215. msg => "local_plugin_config_used",
  1216. name_vsn => NameVsn,
  1217. reason => "no_other_running_nodes"
  1218. }),
  1219. cp_default_config_file(NameVsn);
  1220. ensure_plugin_config(NameVsn, Nodes) ->
  1221. case get_plugin_config_from_any_node(Nodes, NameVsn, []) of
  1222. {ok, ConfigMap} when is_map(ConfigMap) ->
  1223. HoconBin = hocon_pp:do(ConfigMap, #{}),
  1224. Path = plugin_config_file(NameVsn),
  1225. ok = filelib:ensure_dir(Path),
  1226. ok = file:write_file(Path, HoconBin),
  1227. ensure_config_map(NameVsn);
  1228. _ ->
  1229. ?SLOG(error, #{msg => "config_not_found_from_cluster", name_vsn => NameVsn}),
  1230. cp_default_config_file(NameVsn)
  1231. end.
  1232. -spec cp_default_config_file(name_vsn()) -> ok.
  1233. cp_default_config_file(NameVsn) ->
  1234. %% always copy default hocon file into config dir when can not get config from other nodes
  1235. Source = default_plugin_config_file(NameVsn),
  1236. Destination = plugin_config_file(NameVsn),
  1237. maybe
  1238. true ?= filelib:is_regular(Source),
  1239. %% destination path not existed (not configured)
  1240. false ?=
  1241. case filelib:is_regular(Destination) of
  1242. true ->
  1243. ?SLOG(debug, #{msg => "plugin_config_file_already_existed", name_vsn => NameVsn});
  1244. false ->
  1245. false
  1246. end,
  1247. ok = filelib:ensure_dir(Destination),
  1248. case file:copy(Source, Destination) of
  1249. {ok, _} ->
  1250. ensure_config_map(NameVsn);
  1251. {error, Reason} ->
  1252. ?SLOG(warning, #{
  1253. msg => "failed_to_copy_plugin_default_hocon_config",
  1254. source => Source,
  1255. destination => Destination,
  1256. reason => Reason
  1257. })
  1258. end
  1259. else
  1260. _ -> ensure_config_map(NameVsn)
  1261. end.
  1262. ensure_config_map(NameVsn) ->
  1263. case read_plugin_hocon(NameVsn, #{read_mode => ?JSON_MAP}) of
  1264. {ok, ConfigJsonMap} ->
  1265. case with_plugin_avsc(NameVsn) of
  1266. true ->
  1267. do_ensure_config_map(NameVsn, ConfigJsonMap);
  1268. false ->
  1269. ?SLOG(debug, #{
  1270. msg => "put_plugin_config_directly",
  1271. hint => "plugin_without_config_schema",
  1272. name_vsn => NameVsn
  1273. }),
  1274. put_config(NameVsn, ConfigJsonMap, ?plugin_without_config_schema)
  1275. end;
  1276. _ ->
  1277. ?SLOG(warning, #{msg => "failed_to_read_plugin_config_hocon", name_vsn => NameVsn}),
  1278. ok
  1279. end.
  1280. do_ensure_config_map(NameVsn, ConfigJsonMap) ->
  1281. case decode_plugin_config_map(NameVsn, ConfigJsonMap) of
  1282. {ok, AvroValue} ->
  1283. put_config(NameVsn, ConfigJsonMap, AvroValue);
  1284. {error, Reason} ->
  1285. ?SLOG(error, #{
  1286. msg => "plugin_config_validation_failed",
  1287. name_vsn => NameVsn,
  1288. reason => Reason
  1289. }),
  1290. ok
  1291. end.
  1292. %% @private Backup the current config to a file with a timestamp suffix and
  1293. %% then save the new config to the config file.
  1294. backup_and_write_hocon_bin(NameVsn, HoconBin) ->
  1295. %% this may fail, but we don't care
  1296. %% e.g. read-only file system
  1297. Path = plugin_config_file(NameVsn),
  1298. _ = filelib:ensure_dir(Path),
  1299. TmpFile = Path ++ ".tmp",
  1300. case file:write_file(TmpFile, HoconBin) of
  1301. ok ->
  1302. backup_and_replace(Path, TmpFile);
  1303. {error, Reason} ->
  1304. ?SLOG(error, #{
  1305. msg => "failed_to_save_plugin_conf_file",
  1306. hint =>
  1307. "The updated cluster config is not saved on this node, please check the file system.",
  1308. filename => TmpFile,
  1309. reason => Reason
  1310. }),
  1311. %% e.g. read-only, it's not the end of the world
  1312. ok
  1313. end.
  1314. backup_and_replace(Path, TmpPath) ->
  1315. Backup = Path ++ "." ++ now_time() ++ ".bak",
  1316. case file:rename(Path, Backup) of
  1317. ok ->
  1318. ok = file:rename(TmpPath, Path),
  1319. ok = prune_backup_files(Path);
  1320. {error, enoent} ->
  1321. %% not created yet
  1322. ok = file:rename(TmpPath, Path);
  1323. {error, Reason} ->
  1324. ?SLOG(warning, #{
  1325. msg => "failed_to_backup_plugin_conf_file",
  1326. filename => Backup,
  1327. reason => Reason
  1328. }),
  1329. ok
  1330. end.
  1331. prune_backup_files(Path) ->
  1332. Files0 = filelib:wildcard(Path ++ ".*"),
  1333. Re = "\\.[0-9]{4}\\.[0-9]{2}\\.[0-9]{2}\\.[0-9]{2}\\.[0-9]{2}\\.[0-9]{2}\\.[0-9]{3}\\.bak$",
  1334. Files = lists:filter(fun(F) -> re:run(F, Re) =/= nomatch end, Files0),
  1335. Sorted = lists:reverse(lists:sort(Files)),
  1336. {_Keeps, Deletes} = lists:split(min(?MAX_KEEP_BACKUP_CONFIGS, length(Sorted)), Sorted),
  1337. lists:foreach(
  1338. fun(F) ->
  1339. case file:delete(F) of
  1340. ok ->
  1341. ok;
  1342. {error, Reason} ->
  1343. ?SLOG(warning, #{
  1344. msg => "failed_to_delete_backup_plugin_conf_file",
  1345. filename => F,
  1346. reason => Reason
  1347. }),
  1348. ok
  1349. end
  1350. end,
  1351. Deletes
  1352. ).
  1353. read_file_fun(Path, Msg, #{read_mode := ?RAW_BIN}) ->
  1354. fun() ->
  1355. case file:read_file(Path) of
  1356. {ok, Bin} ->
  1357. {ok, Bin};
  1358. {error, Reason} ->
  1359. ErrMeta = #{msg => Msg, reason => Reason},
  1360. throw(ErrMeta)
  1361. end
  1362. end;
  1363. read_file_fun(Path, Msg, #{read_mode := ?JSON_MAP}) ->
  1364. fun() ->
  1365. case hocon:load(Path, #{format => richmap}) of
  1366. {ok, RichMap} ->
  1367. {ok, hocon_maps:ensure_plain(RichMap)};
  1368. {error, Reason} ->
  1369. ErrMeta = #{msg => Msg, reason => Reason},
  1370. throw(ErrMeta)
  1371. end
  1372. end.
  1373. %% Directorys
  1374. -spec plugin_dir(name_vsn()) -> string().
  1375. plugin_dir(NameVsn) ->
  1376. wrap_to_list(filename:join([install_dir(), NameVsn])).
  1377. -spec plugin_priv_dir(name_vsn()) -> string().
  1378. plugin_priv_dir(NameVsn) ->
  1379. maybe
  1380. {ok, #{<<"name">> := Name, <<"rel_apps">> := Apps}} ?=
  1381. read_plugin_info(NameVsn, #{fill_readme => false}),
  1382. {ok, AppDir} ?= app_dir(Name, Apps),
  1383. wrap_to_list(filename:join([plugin_dir(NameVsn), AppDir, "priv"]))
  1384. else
  1385. %% Otherwise assume the priv directory is under the plugin root directory
  1386. _ -> wrap_to_list(filename:join([install_dir(), NameVsn, "priv"]))
  1387. end.
  1388. -spec plugin_data_dir(name_vsn()) -> string() | {error, Reason :: string()}.
  1389. plugin_data_dir(NameVsn) ->
  1390. case parse_name_vsn(NameVsn) of
  1391. {ok, NameAtom, _Vsn} ->
  1392. wrap_to_list(filename:join([emqx:data_dir(), "plugins", atom_to_list(NameAtom)]));
  1393. {error, Reason} ->
  1394. ?SLOG(warning, #{
  1395. msg => "failed_to_generate_plugin_config_dir_for_plugin",
  1396. plugin_namevsn => NameVsn,
  1397. reason => Reason
  1398. }),
  1399. {error, Reason}
  1400. end.
  1401. plugin_certs_dir(NameVsn) ->
  1402. wrap_to_list(filename:join([plugin_data_dir(NameVsn), "certs"])).
  1403. %% Files
  1404. -spec pkg_file_path(name_vsn()) -> string().
  1405. pkg_file_path(NameVsn) ->
  1406. wrap_to_list(filename:join([install_dir(), bin([NameVsn, ".tar.gz"])])).
  1407. -spec info_file_path(name_vsn()) -> string().
  1408. info_file_path(NameVsn) ->
  1409. wrap_to_list(filename:join([plugin_dir(NameVsn), "release.json"])).
  1410. -spec avsc_file_path(name_vsn()) -> string().
  1411. avsc_file_path(NameVsn) ->
  1412. wrap_to_list(filename:join([plugin_priv_dir(NameVsn), "config_schema.avsc"])).
  1413. -spec plugin_config_file(name_vsn()) -> string().
  1414. plugin_config_file(NameVsn) ->
  1415. wrap_to_list(filename:join([plugin_data_dir(NameVsn), "config.hocon"])).
  1416. %% should only used when plugin installing
  1417. -spec default_plugin_config_file(name_vsn()) -> string().
  1418. default_plugin_config_file(NameVsn) ->
  1419. wrap_to_list(filename:join([plugin_priv_dir(NameVsn), "config.hocon"])).
  1420. -spec i18n_file_path(name_vsn()) -> string().
  1421. i18n_file_path(NameVsn) ->
  1422. wrap_to_list(filename:join([plugin_priv_dir(NameVsn), "config_i18n.json"])).
  1423. -spec md5sum_file(name_vsn()) -> string().
  1424. md5sum_file(NameVsn) ->
  1425. plugin_dir(NameVsn) ++ ".tar.gz.md5sum".
  1426. -spec readme_file(name_vsn()) -> string().
  1427. readme_file(NameVsn) ->
  1428. wrap_to_list(filename:join([plugin_dir(NameVsn), "README.md"])).
  1429. running_apps() ->
  1430. lists:map(
  1431. fun({N, _, V}) ->
  1432. {N, V}
  1433. end,
  1434. application:which_applications(infinity)
  1435. ).
  1436. %% @private This is the same human-readable timestamp format as
  1437. %% hocon-cli generated app.<time>.config file name.
  1438. now_time() ->
  1439. Ts = os:system_time(millisecond),
  1440. {{Y, M, D}, {HH, MM, SS}} = calendar:system_time_to_local_time(Ts, millisecond),
  1441. Res = io_lib:format(
  1442. "~0p.~2..0b.~2..0b.~2..0b.~2..0b.~2..0b.~3..0b",
  1443. [Y, M, D, HH, MM, SS, Ts rem 1000]
  1444. ),
  1445. lists:flatten(Res).
  1446. bin_key(Map) when is_map(Map) ->
  1447. maps:fold(fun(K, V, Acc) -> Acc#{bin(K) => V} end, #{}, Map);
  1448. bin_key(List = [#{} | _]) ->
  1449. lists:map(fun(M) -> bin_key(M) end, List);
  1450. bin_key(Term) ->
  1451. Term.
  1452. bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
  1453. bin(L) when is_list(L) -> unicode:characters_to_binary(L, utf8);
  1454. bin(B) when is_binary(B) -> B.
  1455. wrap_to_list(Path) ->
  1456. binary_to_list(iolist_to_binary(Path)).
  1457. run_with_timeout(Module, Function, Args, Timeout) ->
  1458. Self = self(),
  1459. Fun = fun() ->
  1460. Result = apply(Module, Function, Args),
  1461. Self ! {self(), Result}
  1462. end,
  1463. Pid = spawn(Fun),
  1464. TimerRef = erlang:send_after(Timeout, self(), {timeout, Pid}),
  1465. receive
  1466. {Pid, Result} ->
  1467. _ = erlang:cancel_timer(TimerRef),
  1468. {ok, Result};
  1469. {timeout, Pid} ->
  1470. exit(Pid, kill),
  1471. {error, timeout}
  1472. end.