Feng 10 лет назад
Родитель
Сommit
1a64e4e373

+ 12 - 0
plugins/emqttd_plugin_demo/src/emqttd_plugin_demo.app.src

@@ -0,0 +1,12 @@
+{application, emqttd_plugin_demo,
+ [
+  {description, ""},
+  {vsn, "1"},
+  {registered, []},
+  {applications, [
+                  kernel,
+                  stdlib
+                 ]},
+  {mod, { emqttd_plugin_demo_app, []}},
+  {env, []}
+ ]}.

+ 16 - 0
plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_app.erl

@@ -0,0 +1,16 @@
+-module(emqttd_plugin_demo_app).
+
+-behaviour(application).
+
+%% Application callbacks
+-export([start/2, stop/1]).
+
+%% ===================================================================
+%% Application callbacks
+%% ===================================================================
+
+start(_StartType, _StartArgs) ->
+    emqttd_plugin_demo_sup:start_link().
+
+stop(_State) ->
+    ok.

+ 27 - 0
plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_sup.erl

@@ -0,0 +1,27 @@
+-module(emqttd_plugin_demo_sup).
+
+-behaviour(supervisor).
+
+%% API
+-export([start_link/0]).
+
+%% Supervisor callbacks
+-export([init/1]).
+
+%% Helper macro for declaring children of supervisor
+-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).
+
+%% ===================================================================
+%% API functions
+%% ===================================================================
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+%% ===================================================================
+%% Supervisor callbacks
+%% ===================================================================
+
+init([]) ->
+    {ok, { {one_for_one, 5, 10}, []} }.
+

+ 19 - 7
src/emqttd_app.erl

@@ -52,9 +52,8 @@ start(_StartType, _StartArgs) ->
     {ok, Sup} = emqttd_sup:start_link(),
     start_servers(Sup),
     emqttd:load_all_mods(),
-    %% emqttd:load_all_plugins(),
-    {ok, Listeners} = application:get_env(listeners),
-    emqttd:open_listeners(Listeners),
+    emqttd_plugins:load(),
+    start_listeners(),
     register(emqttd, self()),
     print_vsn(),
     {ok, Sup}.
@@ -67,6 +66,10 @@ print_vsn() ->
     {ok, Desc} = application:get_key(description),
     ?PRINT("~s ~s is running now~n", [Desc, Vsn]).
 
+start_listeners() ->
+    {ok, Listeners} = application:get_env(listeners),
+    emqttd:open_listeners(Listeners).
+
 start_servers(Sup) ->
     Servers = [{"emqttd trace", emqttd_trace},
                {"emqttd pooler", {supervisor, emqttd_pooler_sup}},
@@ -131,14 +134,23 @@ worker_spec(Name, Opts) ->
 
 %% close all listeners first...
 prep_stop(State) ->
-    %%TODO: esockd app should be running...
-    {ok, Listeners} = application:get_env(listeners),
-    emqttd:close_listeners(Listeners),
+    stop_listeners(), 
+    timer:sleep(2),
+    emqttd_plugins:unload(),
     timer:sleep(2),
     State.
 
+stop_listeners() ->
+    %% ensure that esockd applications is started?
+    case lists:keyfind(esockd, 1, application:which_applications()) of
+        false  ->
+            ignore;
+        _Tuple ->
+            {ok, Listeners} = application:get_env(listeners),
+            emqttd:close_listeners(Listeners)
+    end.
+
 -spec stop(State :: term()) -> term().
 stop(_State) ->
     ok.
 
-

+ 8 - 9
src/emqttd_ctl.erl

@@ -77,8 +77,6 @@ cluster([SNode]) ->
     pong ->
         case emqttd:is_running(Node) of
             true ->
-                %%TODO: should not unload here.
-                %% emqttd:unload_all_plugins(),
                 application:stop(emqttd),
                 application:stop(esockd),
                 application:stop(gproc),
@@ -180,19 +178,20 @@ bridges(["stop", SNode, Topic]) ->
     end.
 
 plugins(["list"]) ->
-    Plugins = emqttd:loaded_plugins(),
-    lists:foreach(fun(Plugin) -> ?PRINT("~p~n", [Plugin]) end, Plugins);
+    lists:foreach(fun(#mqtt_plugin{name = Name, version = Ver, descr = Descr, active = Active}) ->
+            ?PRINT("Plugin(~s, version=~s, description=~s, active=~s)~n", [Name, Ver, Descr, Active])
+        end, emqttd_plugins:list());
 
 plugins(["load", Name]) ->
-    case emqttd:load_plugin(list_to_atom(Name)) of
-        ok -> ?PRINT("plugin ~s is loaded successfully.~n", [Name]);
-        {error, Reason} -> ?PRINT("error: ~s~n", [Reason])
+    case emqttd_plugins:load(list_to_atom(Name)) of
+        {ok, StartedApps} -> ?PRINT("start apps: ~p, plugin ~s is loaded successfully.~n", [StartedApps, Name]);
+        {error, Reason} -> ?PRINT("load plugin error: ~s~n", [Reason])
     end;
 
 plugins(["unload", Name]) ->
-    case emqttd:unload_plugin(list_to_atom(Name)) of
+    case emqttd_plugins:unload(list_to_atom(Name)) of
         ok -> ?PRINT("plugin ~s is unloaded successfully.~n", [Name]);
-        {error, Reason} -> ?PRINT("error: ~s~n", [Reason])
+        {error, Reason} -> ?PRINT("unload plugin error: ~s~n", [Reason])
     end.
 
 trace(["list"]) ->

+ 0 - 172
src/emqttd_plugin_manager.erl

@@ -1,172 +0,0 @@
-%%%-----------------------------------------------------------------------------
-%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
-%%%
-%%% Permission is hereby granted, free of charge, to any person obtaining a copy
-%%% of this software and associated documentation files (the "Software"), to deal
-%%% in the Software without restriction, including without limitation the rights
-%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-%%% copies of the Software, and to permit persons to whom the Software is
-%%% furnished to do so, subject to the following conditions:
-%%%
-%%% The above copyright notice and this permission notice shall be included in all
-%%% copies or substantial portions of the Software.
-%%%
-%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
-%%% SOFTWARE.
-%%%-----------------------------------------------------------------------------
-%%% @doc
-%%% emqttd plugin manager.
-%%%
-%%% @end
-%%%-----------------------------------------------------------------------------
-
--module(emqttd_plugin_manager).
-
--author("Feng Lee <feng@emqtt.io>").
-
--include("emqttd.hrl").
-
--export([start/0, list/0, load/1, unload/1, stop/0]).
-
-start() ->
-    %% start all plugins
-    %%
-    ok.
-
-%%------------------------------------------------------------------------------
-%% @doc Load all plugins
-%% @end
-%%------------------------------------------------------------------------------
--spec load_all_plugins() -> [{App :: atom(), ok | {error, any()}}].
-load_all_plugins() ->
-    %% save first
-    case file:consult("etc/plugins.config") of
-        {ok, [PluginApps]} ->
-            ok;
-            %% application:set_env(emqttd, plugins, [App || {App, _Env} <- PluginApps]),
-            %% [{App, load_plugin(App)} || {App, _Env} <- PluginApps];
-        {error, enoent} ->
-            lager:error("etc/plugins.config not found!");
-        {error, Error} ->
-            lager:error("Load etc/plugins.config error: ~p", [Error])
-    end.
-
-%%------------------------------------------------------------------------------
-%% List all available plugins
-%%------------------------------------------------------------------------------
-list() ->
-    {ok, PluginEnv} = application:get_env(emqttd, plugins),
-    PluginsDir = proplists:get_value(dir, PluginEnv, "./plugins"),
-    AppFiles = filelib:wildcard("*/ebin/*.app", PluginsDir),
-    Plugins = [plugin(filename:join(PluginsDir, AppFile)) || AppFile <- AppFiles],
-    StartedApps = [Name || {Name, _Descr, _Ver} <- application:which_applications()],
-    lists:map(fun(Plugin = #mqtt_plugin{name = Name}) ->
-                      case lists:member(Name, StartedApps) of
-                          true  -> Plugin#mqtt_plugin{active = true};
-                          false -> Plugin
-                      end
-              end, Plugins).
-
-plugin(AppFile) ->
-    {ok, [{application, Name, Attrs}]} = file:consult(AppFile),
-    Ver = proplists:get_value(vsn, Attrs),
-    Descr = proplists:get_value(description, Attrs, ""),
-    #mqtt_plugin{name = Name, version = Ver, descr = Descr}.
-
-%%------------------------------------------------------------------------------
-%% @doc Load Plugin
-%% @end
-%%------------------------------------------------------------------------------
--spec load(atom()) -> ok | {error, any()}.
-load(PluginName) when is_atom(PluginName) ->
-    %% start plugin
-    %% write file if plugin is loaded
-    ok.
-
--spec load_plugin(App :: atom()) -> ok | {error, any()}.
-load_plugin(App) ->
-    case load_app(App) of
-        ok ->
-            start_app(App);
-        {error, Reason} ->
-            {error, Reason}
-    end.
-
-load_app(App) ->
-    case application:load(App) of
-        ok ->
-            lager:info("load plugin ~p successfully", [App]), ok;
-        {error, {already_loaded, App}} ->
-            lager:info("load plugin ~p is already loaded", [App]), ok;
-        {error, Reason} ->
-            lager:error("load plugin ~p error: ~p", [App, Reason]), {error, Reason}
-    end.
-
-start_app(App) ->
-    case application:start(App) of
-        ok ->
-            lager:info("start plugin ~p successfully", [App]), ok;
-        {error, {already_started, App}} ->
-            lager:error("plugin ~p is already started", [App]), ok;
-        {error, Reason} ->
-            lager:error("start plugin ~p error: ~p", [App, Reason]), {error, Reason}
-    end.
-
-
-%%------------------------------------------------------------------------------
-%% @doc UnLoad Plugin
-%% @end
-%%------------------------------------------------------------------------------
--spec unload(atom()) -> ok | {error, any()}.
-unload(PluginName) when is_atom(PluginName) ->
-    %% stop plugin
-    %% write file if plugin is loaded
-    ok.
-
--spec unload_plugin(App :: atom()) -> ok | {error, any()}.
-unload_plugin(App) ->
-    case stop_app(App) of
-        ok ->
-            unload_app(App);
-        {error, Reason} ->
-            {error, Reason}
-    end.
-    
-stop_app(App) ->
-    case application:stop(App) of
-        ok ->
-            lager:info("stop plugin ~p successfully~n", [App]), ok;
-        {error, {not_started, App}} ->
-            lager:error("plugin ~p is not started~n", [App]), ok;
-        {error, Reason} ->
-            lager:error("stop plugin ~p error: ~p", [App]), {error, Reason}
-    end.
-
-unload_app(App) ->
-    case application:unload(App) of
-        ok ->
-            lager:info("unload plugin ~p successfully~n", [App]), ok;
-        {error, {not_loaded, App}} ->
-            lager:info("load plugin ~p is not loaded~n", [App]), ok;
-        {error, Reason} ->
-            lager:error("unload plugin ~p error: ~p", [App, Reason]), {error, Reason}
-    end.
-
-stop() ->
-    %% stop all plugins
-    ok.
-
-%%------------------------------------------------------------------------------
-%% @doc Unload all plugins
-%% @end
-%%------------------------------------------------------------------------------
--spec unload_all_plugins() -> [{App :: atom(), ok | {error, any()}}].
-unload_all_plugins() ->
-    PluginApps = application:get_env(emqttd, plugins, []).
-    %%[{App, unload_plugin(App)} || App <- PluginApps].
-

+ 83 - 63
src/emqttd_plugin_mgr.erl

@@ -20,53 +20,75 @@
 %%% SOFTWARE.
 %%%-----------------------------------------------------------------------------
 %%% @doc
-%%% emqttd plugin manager.
+%%% emqttd plugin admin.
 %%%
 %%% @end
 %%%-----------------------------------------------------------------------------
 
--module(emqttd_plugin_mgr).
+-module(emqttd_plugins).
 
 -author("Feng Lee <feng@emqtt.io>").
 
 -include("emqttd.hrl").
 
--export([start/0, list/0, load/1, unload/1, stop/0]).
+-export([load/0, unload/0]).
+
+-export([list/0, load/1, unload/1]).
 
 %%------------------------------------------------------------------------------
-%% @doc Load all plugins
+%% @doc Load all plugins when the broker started.
 %% @end
 %%------------------------------------------------------------------------------
--spec start() -> ok | {error, any()}.
-start() ->
+-spec load() -> list() | {error, any()}.
+load() ->
     case read_loaded() of
-        {ok, AppNames} ->
-            NotFound = AppNames -- apps(plugin),
+        {ok, LoadNames} ->
+            NotFound = LoadNames -- apps(plugin),
             case NotFound of
                 [] -> ok;
                 NotFound -> lager:error("Cannot find plugins: ~p", [NotFound])
             end,
-            {ok, start_apps(AppNames -- NotFound -- apps(started))};
+            start_apps(LoadNames -- NotFound -- apps(started));
+        {error, Error} ->
+            lager:error("Read loaded_plugins file error: ~p", [Error]),
+            {error, Error}
+    end.
+
+start_apps(Apps) ->
+    [start_app(App) || App <- Apps].
+
+%%------------------------------------------------------------------------------
+%% @doc Unload all plugins before broker stopped.
+%% @end
+%%------------------------------------------------------------------------------
+-spec unload() -> list() | {error, any()}.
+unload() ->
+    case read_loaded() of
+        {ok, LoadNames} ->
+            stop_apps(LoadNames);
         {error, Error} ->
             lager:error("Read loaded_plugins file error: ~p", [Error]),
             {error, Error}
     end.
 
+stop_apps(Apps) ->
+    [stop_app(App) || App <- Apps].
+
 %%------------------------------------------------------------------------------
 %% @doc List all available plugins
 %% @end
 %%------------------------------------------------------------------------------
+-spec list() -> [mqtt_plugin()].
 list() ->
-    {ok, PluginEnv} = application:get_env(emqttd, plugins),
-    PluginsDir = proplists:get_value(dir, PluginEnv, "./plugins"),
+    PluginsDir = env(dir),
     AppFiles = filelib:wildcard("*/ebin/*.app", PluginsDir),
     Plugins = [plugin(filename:join(PluginsDir, AppFile)) || AppFile <- AppFiles],
-    StartedApps = [Name || {Name, _Descr, _Ver} <- application:which_applications()],
+    StartedApps = apps(started),
     lists:map(fun(Plugin = #mqtt_plugin{name = Name}) ->
-                      case lists:member(Name, StartedApps) of
-                          true  -> Plugin#mqtt_plugin{active = true};
-                          false -> Plugin
-                      end
+                  case lists:member(Name, StartedApps) of
+                      true  -> Plugin#mqtt_plugin{active = true};
+                      false -> Plugin
+                  end
               end, Plugins).
 
 plugin(AppFile) ->
@@ -76,23 +98,20 @@ plugin(AppFile) ->
     #mqtt_plugin{name = Name, version = Ver, descr = Descr}.
 
 %%------------------------------------------------------------------------------
-%% @doc Load Plugin
+%% @doc Load One Plugin
 %% @end
 %%------------------------------------------------------------------------------
 -spec load(atom()) -> ok | {error, any()}.
 load(PluginName) when is_atom(PluginName) ->
-    case lists:member(PluginName, apps(started)) of
-        true ->
-            lager:info("plugin ~p is started", [PluginName]),
+    case {lists:member(PluginName, apps(started)), lists:member(PluginName, apps(plugin))} of
+        {true, _} ->
+            lager:error("plugin ~p is started", [PluginName]),
             {error, already_started};
-        false ->
-            case lists:member(PluginName, apps(plugin)) of
-                true ->
-                    load_plugin(PluginName);
-                false ->
-                    lager:info("plugin ~p is not found", [PluginName]),
-                    {error, not_foun}
-            end
+        {false, true} ->
+            load_plugin(PluginName);
+        {false, false} ->
+            lager:error("plugin ~p is not found", [PluginName]),
+            {error, not_found}
     end.
     
 -spec load_plugin(App :: atom()) -> {ok, list()} | {error, any()}.
@@ -116,20 +135,27 @@ start_app(App) ->
     end.
 
 %%------------------------------------------------------------------------------
-%% @doc UnLoad Plugin
+%% @doc UnLoad One Plugin
 %% @end
 %%------------------------------------------------------------------------------
 -spec unload(atom()) -> ok | {error, any()}.
 unload(PluginName) when is_atom(PluginName) ->
-    %% stop plugin
-    %% write file if plugin is loaded
-    ok.
+    case {lists:member(PluginName, apps(started)), lists:member(PluginName, apps(plugin))} of
+        {false, _} ->
+            lager:error("plugin ~p is not started", [PluginName]),
+            {error, not_started};
+        {true, true} ->
+            unload_plugin(PluginName);
+        {true, false} ->
+            lager:error("~s is not a plugin, cannot unload it", [PluginName]),
+            {error, not_found}
+    end.
 
 -spec unload_plugin(App :: atom()) -> ok | {error, any()}.
 unload_plugin(App) ->
     case stop_app(App) of
         ok ->
-            unload_app(App);
+            plugin_unloaded(App), ok;
         {error, Reason} ->
             {error, Reason}
     end.
@@ -144,32 +170,10 @@ stop_app(App) ->
             lager:error("stop plugin ~p error: ~p", [App]), {error, Reason}
     end.
 
-unload_app(App) ->
-    case application:unload(App) of
-        ok ->
-            lager:info("unload plugin ~p successfully~n", [App]), ok;
-        {error, {not_loaded, App}} ->
-            lager:info("load plugin ~p is not loaded~n", [App]), ok;
-        {error, Reason} ->
-            lager:error("unload plugin ~p error: ~p", [App, Reason]), {error, Reason}
-    end.
-
-stop() ->
-    %% stop all plugins
-    PluginApps = application:get_env(emqttd, plugins, []),
-    %%[{App, unload_plugin(App)} || App <- PluginApps].
-    ok.
-
 %%%=============================================================================
 %%% Internal functions
 %%%=============================================================================
 
-start_apps(Apps) ->
-    [start_app(App) || App <- Apps].
-
-stop_apps(Apps) ->
-    [stop_app(App) || App <- Apps].
-
 apps(plugin) ->
     [Name || #mqtt_plugin{name = Name} <- list()];
 
@@ -181,7 +185,7 @@ plugin_loaded(Name) ->
         {ok, Names} ->
             case lists:member(Name, Names) of
                 true ->
-                    ok;
+                    ignore;
                 false ->
                     %% write file if plugin is loaded
                     write_loaded(lists:append(Names, Name))
@@ -190,17 +194,24 @@ plugin_loaded(Name) ->
             lager:error("Cannot read loaded plugins: ~p", [Error])
     end.
 
-
+plugin_unloaded(Name) ->
+    case read_loaded() of
+        {ok, Names} ->
+            case lists:member(Name, Names) of
+                true ->
+                    write_loaded(lists:delete(Name, Names));
+                false ->
+                    lager:error("Cannot find ~s in loaded_file", [Name])
+            end;
+        {error, Error} ->
+            lager:error("Cannot read loaded plugins: ~p", [Error])
+    end.
 
 read_loaded() ->
-    {ok, PluginEnv} = application:get_env(emqttd, plugins),
-    LoadedFile = proplists:get_value(loaded_file, PluginEnv, "./data/loaded_plugins"),
-    file:consult(LoadedFile).
+    file:consult(env(loaded_file)).
 
 write_loaded(AppNames) ->
-    {ok, PluginEnv} = application:get_env(emqttd, plugins),
-    LoadedFile = proplists:get_value(loaded_file, PluginEnv, "./data/loaded_plugins"),
-    case file:open(LoadedFile, [binary, write]) of
+    case file:open(env(loaded_file), [binary, write]) of
         {ok, Fd} ->
             Line = list_to_binary(io_lib:format("~w.~n", [AppNames])),
             file:write(Fd, Line);
@@ -208,3 +219,12 @@ write_loaded(AppNames) ->
             {error, Error}
     end.
 
+env(dir) ->
+    proplists:get_value(dir, env(), "./plugins");
+
+env(loaded_file) ->
+    proplists:get_value(loaded_file, env(), "./data/loaded_plugins").
+
+env() ->
+    {ok, PluginsEnv} = application:get_env(emqttd, plugins), PluginsEnv.
+