| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915 |
- %%--------------------------------------------------------------------
- %% Copyright (c) 2017-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
- %%
- %% Licensed under the Apache License, Version 2.0 (the "License");
- %% you may not use this file except in compliance with the License.
- %% You may obtain a copy of the License at
- %%
- %% http://www.apache.org/licenses/LICENSE-2.0
- %%
- %% Unless required by applicable law or agreed to in writing, software
- %% distributed under the License is distributed on an "AS IS" BASIS,
- %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- %% See the License for the specific language governing permissions and
- %% limitations under the License.
- %%--------------------------------------------------------------------
- -module(emqx_plugins).
- -include_lib("emqx/include/logger.hrl").
- -include_lib("emqx/include/logger.hrl").
- -include("emqx_plugins.hrl").
- -ifdef(TEST).
- -include_lib("eunit/include/eunit.hrl").
- -endif.
- -export([
- ensure_installed/1,
- ensure_uninstalled/1,
- ensure_enabled/1,
- ensure_enabled/2,
- ensure_enabled/3,
- ensure_disabled/1,
- purge/1,
- delete_package/1
- ]).
- -export([
- ensure_started/0,
- ensure_started/1,
- ensure_stopped/0,
- ensure_stopped/1,
- restart/1,
- list/0,
- describe/1,
- parse_name_vsn/1
- ]).
- -export([
- get_config/2,
- put_config/2,
- get_tar/1
- ]).
- %% `emqx_config_handler' API
- -export([
- post_config_update/5
- ]).
- %% internal
- -export([do_ensure_started/1]).
- -export([
- install_dir/0
- ]).
- -ifdef(TEST).
- -compile(export_all).
- -compile(nowarn_export_all).
- -endif.
- %% "my_plugin-0.1.0"
- -type name_vsn() :: binary() | string().
- %% the parse result of the JSON info file
- -type plugin() :: map().
- -type position() :: no_move | front | rear | {before, name_vsn()} | {behind, name_vsn()}.
- %%--------------------------------------------------------------------
- %% APIs
- %%--------------------------------------------------------------------
- %% @doc Describe a plugin.
- -spec describe(name_vsn()) -> {ok, plugin()} | {error, any()}.
- describe(NameVsn) -> read_plugin(NameVsn, #{fill_readme => true}).
- %% @doc Install a .tar.gz package placed in install_dir.
- -spec ensure_installed(name_vsn()) -> ok | {error, map()}.
- ensure_installed(NameVsn) ->
- case read_plugin(NameVsn, #{}) of
- {ok, _} ->
- ok;
- {error, _} ->
- ok = purge(NameVsn),
- do_ensure_installed(NameVsn)
- end.
- do_ensure_installed(NameVsn) ->
- TarGz = pkg_file(NameVsn),
- case erl_tar:extract(TarGz, [compressed, memory]) of
- {ok, TarContent} ->
- ok = write_tar_file_content(install_dir(), TarContent),
- case read_plugin(NameVsn, #{}) of
- {ok, _} ->
- ok;
- {error, Reason} ->
- ?SLOG(warning, Reason#{msg => "failed_to_read_after_install"}),
- ok = delete_tar_file_content(install_dir(), TarContent),
- {error, Reason}
- end;
- {error, {_, enoent}} ->
- {error, #{
- reason => "failed_to_extract_plugin_package",
- path => TarGz,
- return => not_found
- }};
- {error, Reason} ->
- {error, #{
- reason => "bad_plugin_package",
- path => TarGz,
- return => Reason
- }}
- end.
- -spec get_tar(name_vsn()) -> {ok, binary()} | {error, any}.
- get_tar(NameVsn) ->
- TarGz = pkg_file(NameVsn),
- case file:read_file(TarGz) of
- {ok, Content} ->
- {ok, Content};
- {error, _} ->
- case maybe_create_tar(NameVsn, TarGz, install_dir()) of
- ok ->
- file:read_file(TarGz);
- Err ->
- Err
- end
- end.
- maybe_create_tar(NameVsn, TarGzName, InstallDir) when is_binary(InstallDir) ->
- maybe_create_tar(NameVsn, TarGzName, binary_to_list(InstallDir));
- maybe_create_tar(NameVsn, TarGzName, InstallDir) ->
- case filelib:wildcard(filename:join(dir(NameVsn), "**")) of
- [_ | _] = PluginFiles ->
- InstallDir1 = string:trim(InstallDir, trailing, "/") ++ "/",
- PluginFiles1 = [{string:prefix(F, InstallDir1), F} || F <- PluginFiles],
- erl_tar:create(TarGzName, PluginFiles1, [compressed]);
- _ ->
- {error, plugin_not_found}
- end.
- write_tar_file_content(BaseDir, TarContent) ->
- lists:foreach(
- fun({Name, Bin}) ->
- Filename = filename:join(BaseDir, Name),
- ok = filelib:ensure_dir(Filename),
- ok = file:write_file(Filename, Bin)
- end,
- TarContent
- ).
- delete_tar_file_content(BaseDir, TarContent) ->
- lists:foreach(
- fun({Name, _}) ->
- Filename = filename:join(BaseDir, Name),
- case filelib:is_file(Filename) of
- true ->
- TopDirOrFile = top_dir(BaseDir, Filename),
- ok = file:del_dir_r(TopDirOrFile);
- false ->
- %% probably already deleted
- ok
- end
- end,
- TarContent
- ).
- top_dir(BaseDir0, DirOrFile) ->
- BaseDir = normalize_dir(BaseDir0),
- case filename:dirname(DirOrFile) of
- RockBottom when RockBottom =:= "/" orelse RockBottom =:= "." ->
- throw({out_of_bounds, DirOrFile});
- BaseDir ->
- DirOrFile;
- Parent ->
- top_dir(BaseDir, Parent)
- end.
- normalize_dir(Dir) ->
- %% Get rid of possible trailing slash
- filename:join([Dir, ""]).
- -ifdef(TEST).
- normalize_dir_test_() ->
- [
- ?_assertEqual("foo", normalize_dir("foo")),
- ?_assertEqual("foo", normalize_dir("foo/")),
- ?_assertEqual("/foo", normalize_dir("/foo")),
- ?_assertEqual("/foo", normalize_dir("/foo/"))
- ].
- top_dir_test_() ->
- [
- ?_assertEqual("base/foo", top_dir("base", filename:join(["base", "foo", "bar"]))),
- ?_assertEqual("/base/foo", top_dir("/base", filename:join(["/", "base", "foo", "bar"]))),
- ?_assertEqual("/base/foo", top_dir("/base/", filename:join(["/", "base", "foo", "bar"]))),
- ?_assertThrow({out_of_bounds, _}, top_dir("/base", filename:join(["/", "base"]))),
- ?_assertThrow({out_of_bounds, _}, top_dir("/base", filename:join(["/", "foo", "bar"])))
- ].
- -endif.
- %% @doc Ensure files and directories for the given plugin are being deleted.
- %% If a plugin is running, or enabled, an error is returned.
- -spec ensure_uninstalled(name_vsn()) -> ok | {error, any()}.
- ensure_uninstalled(NameVsn) ->
- case read_plugin(NameVsn, #{}) of
- {ok, #{running_status := RunningSt}} when RunningSt =/= stopped ->
- {error, #{
- reason => "bad_plugin_running_status",
- hint => "stop_the_plugin_first"
- }};
- {ok, #{config_status := enabled}} ->
- {error, #{
- reason => "bad_plugin_config_status",
- hint => "disable_the_plugin_first"
- }};
- _ ->
- purge(NameVsn),
- ensure_delete(NameVsn)
- end.
- ensure_delete(NameVsn0) ->
- NameVsn = bin(NameVsn0),
- List = configured(),
- put_configured(lists:filter(fun(#{name_vsn := N1}) -> bin(N1) =/= NameVsn end, List)),
- ok.
- %% @doc Ensure a plugin is enabled to the end of the plugins list.
- -spec ensure_enabled(name_vsn()) -> ok | {error, any()}.
- ensure_enabled(NameVsn) ->
- ensure_enabled(NameVsn, no_move).
- %% @doc Ensure a plugin is enabled at the given position of the plugin list.
- -spec ensure_enabled(name_vsn(), position()) -> ok | {error, any()}.
- ensure_enabled(NameVsn, Position) ->
- ensure_state(NameVsn, Position, _Enabled = true, _ConfLocation = local).
- -spec ensure_enabled(name_vsn(), position(), local | global) -> ok | {error, any()}.
- ensure_enabled(NameVsn, Position, ConfLocation) when
- ConfLocation =:= local; ConfLocation =:= global
- ->
- ensure_state(NameVsn, Position, _Enabled = true, ConfLocation).
- %% @doc Ensure a plugin is disabled.
- -spec ensure_disabled(name_vsn()) -> ok | {error, any()}.
- ensure_disabled(NameVsn) ->
- ensure_state(NameVsn, no_move, false, _ConfLocation = local).
- ensure_state(NameVsn, Position, State, ConfLocation) when is_binary(NameVsn) ->
- ensure_state(binary_to_list(NameVsn), Position, State, ConfLocation);
- ensure_state(NameVsn, Position, State, ConfLocation) ->
- case read_plugin(NameVsn, #{}) of
- {ok, _} ->
- Item = #{
- name_vsn => NameVsn,
- enable => State
- },
- tryit("ensure_state", fun() -> ensure_configured(Item, Position, ConfLocation) end);
- {error, Reason} ->
- {error, Reason}
- end.
- ensure_configured(#{name_vsn := NameVsn} = Item, Position, ConfLocation) ->
- Configured = configured(),
- SplitFun = fun(#{name_vsn := Nv}) -> bin(Nv) =/= bin(NameVsn) end,
- {Front, Rear} = lists:splitwith(SplitFun, Configured),
- NewConfigured =
- case Rear of
- [_ | More] when Position =:= no_move ->
- Front ++ [Item | More];
- [_ | More] ->
- add_new_configured(Front ++ More, Position, Item);
- [] ->
- add_new_configured(Configured, Position, Item)
- end,
- ok = put_configured(NewConfigured, ConfLocation).
- add_new_configured(Configured, no_move, Item) ->
- %% default to rear
- add_new_configured(Configured, rear, Item);
- add_new_configured(Configured, front, Item) ->
- [Item | Configured];
- add_new_configured(Configured, rear, Item) ->
- Configured ++ [Item];
- add_new_configured(Configured, {Action, NameVsn}, Item) ->
- SplitFun = fun(#{name_vsn := Nv}) -> bin(Nv) =/= bin(NameVsn) end,
- {Front, Rear} = lists:splitwith(SplitFun, Configured),
- Rear =:= [] andalso
- throw(#{
- error => "position_anchor_plugin_not_configured",
- hint => "maybe_install_and_configure",
- name_vsn => NameVsn
- }),
- case Action of
- before ->
- Front ++ [Item | Rear];
- behind ->
- [Anchor | Rear0] = Rear,
- Front ++ [Anchor, Item | Rear0]
- end.
- %% @doc Delete the package file.
- -spec delete_package(name_vsn()) -> ok.
- delete_package(NameVsn) ->
- File = pkg_file(NameVsn),
- case file:delete(File) of
- ok ->
- ?SLOG(info, #{msg => "purged_plugin_dir", path => File}),
- ok;
- {error, enoent} ->
- ok;
- {error, Reason} ->
- ?SLOG(error, #{
- msg => "failed_to_delete_package_file",
- path => File,
- reason => Reason
- }),
- {error, Reason}
- end.
- %% @doc Delete extracted dir
- %% In case one lib is shared by multiple plugins.
- %% it might be the case that purging one plugin's install dir
- %% will cause deletion of loaded beams.
- %% It should not be a problem, because shared lib should
- %% reside in all the plugin install dirs.
- -spec purge(name_vsn()) -> ok.
- purge(NameVsn) ->
- Dir = dir(NameVsn),
- case file:del_dir_r(Dir) of
- ok ->
- ?SLOG(info, #{msg => "purged_plugin_dir", dir => Dir});
- {error, enoent} ->
- ok;
- {error, Reason} ->
- ?SLOG(error, #{
- msg => "failed_to_purge_plugin_dir",
- dir => Dir,
- reason => Reason
- }),
- {error, Reason}
- end.
- %% @doc Start all configured plugins are started.
- -spec ensure_started() -> ok.
- ensure_started() ->
- ok = for_plugins(fun ?MODULE:do_ensure_started/1).
- %% @doc Start a plugin from Management API or CLI.
- %% the input is a <name>-<vsn> string.
- -spec ensure_started(name_vsn()) -> ok | {error, term()}.
- ensure_started(NameVsn) ->
- case do_ensure_started(NameVsn) of
- ok ->
- ok;
- {error, Reason} ->
- ?SLOG(alert, #{
- msg => "failed_to_start_plugin",
- reason => Reason
- }),
- {error, Reason}
- end.
- %% @doc Stop all plugins before broker stops.
- -spec ensure_stopped() -> ok.
- ensure_stopped() ->
- for_plugins(fun ?MODULE:ensure_stopped/1).
- %% @doc Stop a plugin from Management API or CLI.
- -spec ensure_stopped(name_vsn()) -> ok | {error, term()}.
- ensure_stopped(NameVsn) ->
- tryit(
- "stop_plugin",
- fun() ->
- Plugin = do_read_plugin(NameVsn),
- ensure_apps_stopped(Plugin)
- end
- ).
- %% @doc Stop and then start the plugin.
- restart(NameVsn) ->
- case ensure_stopped(NameVsn) of
- ok -> ensure_started(NameVsn);
- {error, Reason} -> {error, Reason}
- end.
- %% @doc List all installed plugins.
- %% Including the ones that are installed, but not enabled in config.
- -spec list() -> [plugin()].
- list() ->
- Pattern = filename:join([install_dir(), "*", "release.json"]),
- All = lists:filtermap(
- fun(JsonFile) ->
- case read_plugin({file, JsonFile}, #{}) of
- {ok, Info} ->
- {true, Info};
- {error, Reason} ->
- ?SLOG(warning, Reason),
- false
- end
- end,
- filelib:wildcard(Pattern)
- ),
- list(configured(), All).
- %% Make sure configured ones are ordered in front.
- list([], All) ->
- All;
- list([#{name_vsn := NameVsn} | Rest], All) ->
- SplitF = fun(#{<<"name">> := Name, <<"rel_vsn">> := Vsn}) ->
- bin([Name, "-", Vsn]) =/= bin(NameVsn)
- end,
- case lists:splitwith(SplitF, All) of
- {_, []} ->
- ?SLOG(warning, #{
- msg => "configured_plugin_not_installed",
- name_vsn => NameVsn
- }),
- list(Rest, All);
- {Front, [I | Rear]} ->
- [I | list(Rest, Front ++ Rear)]
- end.
- do_ensure_started(NameVsn) ->
- tryit(
- "start_plugins",
- fun() ->
- case ensure_exists_and_installed(NameVsn) of
- ok ->
- Plugin = do_read_plugin(NameVsn),
- ok = load_code_start_apps(NameVsn, Plugin);
- {error, plugin_not_found} ->
- ?SLOG(error, #{
- msg => "plugin_not_found",
- name_vsn => NameVsn
- })
- end
- end
- ).
- %% try the function, catch 'throw' exceptions as normal 'error' return
- %% other exceptions with stacktrace logged.
- tryit(WhichOp, F) ->
- try
- F()
- catch
- throw:Reason ->
- %% thrown exceptions are known errors
- %% translate to a return value without stacktrace
- {error, Reason};
- error:Reason:Stacktrace ->
- %% unexpected errors, log stacktrace
- ?SLOG(warning, #{
- msg => "plugin_op_failed",
- which_op => WhichOp,
- exception => Reason,
- stacktrace => Stacktrace
- }),
- {error, {failed, WhichOp}}
- end.
- %% read plugin info from the JSON file
- %% returns {ok, Info} or {error, Reason}
- read_plugin(NameVsn, Options) ->
- tryit(
- "read_plugin_info",
- fun() -> {ok, do_read_plugin(NameVsn, Options)} end
- ).
- do_read_plugin(Plugin) -> do_read_plugin(Plugin, #{}).
- do_read_plugin({file, InfoFile}, Options) ->
- [_, NameVsn | _] = lists:reverse(filename:split(InfoFile)),
- case hocon:load(InfoFile, #{format => richmap}) of
- {ok, RichMap} ->
- Info0 = check_plugin(hocon_maps:ensure_plain(RichMap), NameVsn, InfoFile),
- Info1 = plugins_readme(NameVsn, Options, Info0),
- plugin_status(NameVsn, Info1);
- {error, Reason} ->
- throw(#{
- error => "bad_info_file",
- path => InfoFile,
- return => Reason
- })
- end;
- do_read_plugin(NameVsn, Options) ->
- do_read_plugin({file, info_file(NameVsn)}, Options).
- ensure_exists_and_installed(NameVsn) ->
- case filelib:is_dir(dir(NameVsn)) of
- true ->
- ok;
- false ->
- %% Do we have the package, but it's not extracted yet?
- case get_tar(NameVsn) of
- {ok, TarContent} ->
- ok = file:write_file(pkg_file(NameVsn), TarContent),
- ok = do_ensure_installed(NameVsn);
- _ ->
- %% If not, try to get it from the cluster.
- do_get_from_cluster(NameVsn)
- end
- end.
- do_get_from_cluster(NameVsn) ->
- Nodes = [N || N <- mria:running_nodes(), N /= node()],
- case get_from_any_node(Nodes, NameVsn, []) of
- {ok, TarContent} ->
- ok = file:write_file(pkg_file(NameVsn), TarContent),
- ok = do_ensure_installed(NameVsn);
- {error, NodeErrors} when Nodes =/= [] ->
- ?SLOG(error, #{
- msg => "failed_to_copy_plugin_from_other_nodes",
- name_vsn => NameVsn,
- node_errors => NodeErrors
- }),
- {error, plugin_not_found};
- {error, _} ->
- ?SLOG(error, #{
- msg => "no_nodes_to_copy_plugin_from",
- name_vsn => NameVsn
- }),
- {error, plugin_not_found}
- end.
- get_from_any_node([], _NameVsn, Errors) ->
- {error, Errors};
- get_from_any_node([Node | T], NameVsn, Errors) ->
- case emqx_plugins_proto_v1:get_tar(Node, NameVsn, infinity) of
- {ok, _} = Res ->
- Res;
- Err ->
- get_from_any_node(T, NameVsn, [{Node, Err} | Errors])
- end.
- plugins_readme(NameVsn, #{fill_readme := true}, Info) ->
- case file:read_file(readme_file(NameVsn)) of
- {ok, Bin} -> Info#{readme => Bin};
- _ -> Info#{readme => <<>>}
- end;
- plugins_readme(_NameVsn, _Options, Info) ->
- Info.
- plugin_status(NameVsn, Info) ->
- {ok, AppName, _AppVsn} = parse_name_vsn(NameVsn),
- RunningSt =
- case application:get_key(AppName, vsn) of
- {ok, _} ->
- case lists:keyfind(AppName, 1, running_apps()) of
- {AppName, _} -> running;
- _ -> loaded
- end;
- undefined ->
- stopped
- end,
- Configured = lists:filtermap(
- fun(#{name_vsn := Nv, enable := St}) ->
- case bin(Nv) =:= bin(NameVsn) of
- true -> {true, St};
- false -> false
- end
- end,
- configured()
- ),
- ConfSt =
- case Configured of
- [] -> not_configured;
- [true] -> enabled;
- [false] -> disabled
- end,
- Info#{
- running_status => RunningSt,
- config_status => ConfSt
- }.
- bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
- bin(L) when is_list(L) -> unicode:characters_to_binary(L, utf8);
- bin(B) when is_binary(B) -> B.
- check_plugin(
- #{
- <<"name">> := Name,
- <<"rel_vsn">> := Vsn,
- <<"rel_apps">> := Apps,
- <<"description">> := _
- } = Info,
- NameVsn,
- File
- ) ->
- case bin(NameVsn) =:= bin([Name, "-", Vsn]) of
- true ->
- try
- %% assert
- [_ | _] = Apps,
- %% validate if the list is all <app>-<vsn> strings
- lists:foreach(fun(App) -> {ok, _, _} = parse_name_vsn(App) end, Apps)
- catch
- _:_ ->
- throw(#{
- error => "bad_rel_apps",
- rel_apps => Apps,
- hint => "A non-empty string list of app_name-app_vsn format"
- })
- end,
- Info;
- false ->
- throw(#{
- error => "name_vsn_mismatch",
- name_vsn => NameVsn,
- path => File,
- name => Name,
- rel_vsn => Vsn
- })
- end;
- check_plugin(_What, NameVsn, File) ->
- throw(#{
- error => "bad_info_file_content",
- mandatory_fields => [rel_vsn, name, rel_apps, description],
- name_vsn => NameVsn,
- path => File
- }).
- load_code_start_apps(RelNameVsn, #{<<"rel_apps">> := Apps}) ->
- LibDir = filename:join([install_dir(), RelNameVsn]),
- RunningApps = running_apps(),
- %% load plugin apps and beam code
- AppNames =
- lists:map(
- fun(AppNameVsn) ->
- {ok, AppName, AppVsn} = parse_name_vsn(AppNameVsn),
- EbinDir = filename:join([LibDir, AppNameVsn, "ebin"]),
- ok = load_plugin_app(AppName, AppVsn, EbinDir, RunningApps),
- AppName
- end,
- Apps
- ),
- lists:foreach(fun start_app/1, AppNames).
- load_plugin_app(AppName, AppVsn, Ebin, RunningApps) ->
- case lists:keyfind(AppName, 1, RunningApps) of
- false ->
- do_load_plugin_app(AppName, Ebin);
- {_, Vsn} ->
- case bin(Vsn) =:= bin(AppVsn) of
- true ->
- %% already started on the exact version
- ok;
- false ->
- %% running but a different version
- ?SLOG(warning, #{
- msg => "plugin_app_already_running",
- name => AppName,
- running_vsn => Vsn,
- loading_vsn => AppVsn
- })
- end
- end.
- do_load_plugin_app(AppName, Ebin) when is_binary(Ebin) ->
- do_load_plugin_app(AppName, binary_to_list(Ebin));
- do_load_plugin_app(AppName, Ebin) ->
- _ = code:add_patha(Ebin),
- Modules = filelib:wildcard(filename:join([Ebin, "*.beam"])),
- lists:foreach(
- fun(BeamFile) ->
- Module = list_to_atom(filename:basename(BeamFile, ".beam")),
- _ = code:purge(Module),
- case code:load_file(Module) of
- {module, _} ->
- ok;
- {error, Reason} ->
- throw(#{
- error => "failed_to_load_plugin_beam",
- path => BeamFile,
- reason => Reason
- })
- end
- end,
- Modules
- ),
- case application:load(AppName) of
- ok ->
- ok;
- {error, {already_loaded, _}} ->
- ok;
- {error, Reason} ->
- throw(#{
- error => "failed_to_load_plugin_app",
- name => AppName,
- reason => Reason
- })
- end.
- start_app(App) ->
- case application:ensure_all_started(App) of
- {ok, Started} ->
- case Started =/= [] of
- true -> ?SLOG(debug, #{msg => "started_plugin_apps", apps => Started});
- false -> ok
- end,
- ?SLOG(debug, #{msg => "started_plugin_app", app => App}),
- ok;
- {error, {ErrApp, Reason}} ->
- throw(#{
- error => "failed_to_start_plugin_app",
- app => App,
- err_app => ErrApp,
- reason => Reason
- })
- end.
- %% Stop all apps installed by the plugin package,
- %% but not the ones shared with others.
- ensure_apps_stopped(#{<<"rel_apps">> := Apps}) ->
- %% load plugin apps and beam code
- AppsToStop =
- lists:map(
- fun(NameVsn) ->
- {ok, AppName, _AppVsn} = parse_name_vsn(NameVsn),
- AppName
- end,
- Apps
- ),
- case tryit("stop_apps", fun() -> stop_apps(AppsToStop) end) of
- {ok, []} ->
- %% all apps stopped
- ok;
- {ok, Left} ->
- ?SLOG(warning, #{
- msg => "unabled_to_stop_plugin_apps",
- apps => Left,
- reason => "running_apps_still_depends_on_this_apps"
- }),
- ok;
- {error, Reason} ->
- {error, Reason}
- end.
- stop_apps(Apps) ->
- RunningApps = running_apps(),
- case do_stop_apps(Apps, [], RunningApps) of
- %% all stopped
- {ok, []} -> {ok, []};
- %% no progress
- {ok, Remain} when Remain =:= Apps -> {ok, Apps};
- %% try again
- {ok, Remain} -> stop_apps(Remain)
- end.
- do_stop_apps([], Remain, _AllApps) ->
- {ok, lists:reverse(Remain)};
- do_stop_apps([App | Apps], Remain, RunningApps) ->
- case is_needed_by_any(App, RunningApps) of
- true ->
- do_stop_apps(Apps, [App | Remain], RunningApps);
- false ->
- ok = stop_app(App),
- do_stop_apps(Apps, Remain, RunningApps)
- end.
- stop_app(App) ->
- case application:stop(App) of
- ok ->
- ?SLOG(debug, #{msg => "stop_plugin_successfully", app => App}),
- ok = unload_moudle_and_app(App);
- {error, {not_started, App}} ->
- ?SLOG(debug, #{msg => "plugin_not_started", app => App}),
- ok = unload_moudle_and_app(App);
- {error, Reason} ->
- throw(#{error => "failed_to_stop_app", app => App, reason => Reason})
- end.
- unload_moudle_and_app(App) ->
- case application:get_key(App, modules) of
- {ok, Modules} -> lists:foreach(fun code:soft_purge/1, Modules);
- _ -> ok
- end,
- _ = application:unload(App),
- ok.
- is_needed_by_any(AppToStop, RunningApps) ->
- lists:any(
- fun({RunningApp, _RunningAppVsn}) ->
- is_needed_by(AppToStop, RunningApp)
- end,
- RunningApps
- ).
- is_needed_by(AppToStop, AppToStop) ->
- false;
- is_needed_by(AppToStop, RunningApp) ->
- case application:get_key(RunningApp, applications) of
- {ok, Deps} -> lists:member(AppToStop, Deps);
- undefined -> false
- end.
- put_config(Key, Value) ->
- put_config(Key, Value, _ConfLocation = local).
- put_config(Key, Value, ConfLocation) when is_atom(Key) ->
- put_config([Key], Value, ConfLocation);
- put_config(Path, Values, _ConfLocation = local) when is_list(Path) ->
- Opts = #{rawconf_with_defaults => true, override_to => cluster},
- %% Already in cluster_rpc, don't use emqx_conf:update, dead calls
- case emqx:update_config([?CONF_ROOT | Path], bin_key(Values), Opts) of
- {ok, _} -> ok;
- Error -> Error
- end;
- put_config(Path, Values, _ConfLocation = global) when is_list(Path) ->
- Opts = #{rawconf_with_defaults => true, override_to => cluster},
- case emqx_conf:update([?CONF_ROOT | Path], bin_key(Values), Opts) of
- {ok, _} -> ok;
- Error -> Error
- end.
- bin_key(Map) when is_map(Map) ->
- maps:fold(fun(K, V, Acc) -> Acc#{bin(K) => V} end, #{}, Map);
- bin_key(List = [#{} | _]) ->
- lists:map(fun(M) -> bin_key(M) end, List);
- bin_key(Term) ->
- Term.
- get_config(Key, Default) when is_atom(Key) ->
- get_config([Key], Default);
- get_config(Path, Default) ->
- emqx_conf:get([?CONF_ROOT | Path], Default).
- install_dir() -> get_config(install_dir, "").
- put_configured(Configured) ->
- put_configured(Configured, _ConfLocation = local).
- put_configured(Configured, ConfLocation) ->
- ok = put_config(states, bin_key(Configured), ConfLocation).
- configured() ->
- get_config(states, []).
- for_plugins(ActionFun) ->
- case lists:flatmap(fun(I) -> for_plugin(I, ActionFun) end, configured()) of
- [] -> ok;
- Errors -> erlang:error(#{function => ActionFun, errors => Errors})
- end.
- for_plugin(#{name_vsn := NameVsn, enable := true}, Fun) ->
- case Fun(NameVsn) of
- ok -> [];
- {error, Reason} -> [{NameVsn, Reason}]
- end;
- for_plugin(#{name_vsn := NameVsn, enable := false}, _Fun) ->
- ?SLOG(debug, #{
- msg => "plugin_disabled",
- name_vsn => NameVsn
- }),
- [].
- parse_name_vsn(NameVsn) when is_binary(NameVsn) ->
- parse_name_vsn(binary_to_list(NameVsn));
- parse_name_vsn(NameVsn) when is_list(NameVsn) ->
- case lists:splitwith(fun(X) -> X =/= $- end, NameVsn) of
- {AppName, [$- | Vsn]} -> {ok, list_to_atom(AppName), Vsn};
- _ -> {error, "bad_name_vsn"}
- end.
- pkg_file(NameVsn) ->
- filename:join([install_dir(), bin([NameVsn, ".tar.gz"])]).
- dir(NameVsn) ->
- filename:join([install_dir(), NameVsn]).
- info_file(NameVsn) ->
- filename:join([dir(NameVsn), "release.json"]).
- readme_file(NameVsn) ->
- filename:join([dir(NameVsn), "README.md"]).
- running_apps() ->
- lists:map(
- fun({N, _, V}) ->
- {N, V}
- end,
- application:which_applications(infinity)
- ).
- %%--------------------------------------------------------------------
- %% `emqx_config_handler' API
- %%--------------------------------------------------------------------
- post_config_update([?CONF_ROOT], _Req, #{states := NewStates}, #{states := OldStates}, _Envs) ->
- NewStatesIndex = maps:from_list([{NV, S} || S = #{name_vsn := NV} <- NewStates]),
- OldStatesIndex = maps:from_list([{NV, S} || S = #{name_vsn := NV} <- OldStates]),
- #{changed := Changed} = emqx_utils_maps:diff_maps(NewStatesIndex, OldStatesIndex),
- maps:foreach(fun enable_disable_plugin/2, Changed),
- ok;
- post_config_update(_Path, _Req, _NewConf, _OldConf, _Envs) ->
- ok.
- enable_disable_plugin(NameVsn, {#{enable := true}, #{enable := false}}) ->
- %% errors are already logged in this fn
- _ = ensure_stopped(NameVsn),
- ok;
- enable_disable_plugin(NameVsn, {#{enable := false}, #{enable := true}}) ->
- %% errors are already logged in this fn
- _ = ensure_started(NameVsn),
- ok;
- enable_disable_plugin(_NameVsn, _Diff) ->
- ok.
|