فهرست منبع

Merge pull request #232 from emqtt/dev

New Plugin Architecture Support
Feng Lee 10 سال پیش
والد
کامیت
7d52d8ac79

+ 6 - 0
.gitmodules

@@ -1,3 +1,9 @@
 [submodule "plugins/emqttd_dashboard"]
 	path = plugins/emqttd_dashboard
 	url = https://github.com/emqtt/emqttd_dashboard.git
+[submodule "plugins/emysql"]
+	path = plugins/emysql
+	url = https://github.com/erylee/emysql.git
+[submodule "plugins/emqttd_plugin_mysql"]
+	path = plugins/emqttd_plugin_mysql
+	url = https://github.com/emqtt/emqttd_plugin_mysql.git

+ 7 - 4
Makefile

@@ -29,11 +29,14 @@ edoc:
 	@$(REBAR) doc
 
 rel: compile
-	@cd rel && ../rebar generate -f
+	@cd rel && $(REBAR) generate -f
 
 plugins:
 	@for plugin in ./plugins/* ; do \
+	if [ -d $${plugin} ]; then \
+		echo "copy $${plugin}"; \
 		cp -R $${plugin} $(DIST)/plugins/ && rm -rf $(DIST)/$${plugin}/src/ ; \
+	fi \
 	done
 
 dist: rel plugins
@@ -44,12 +47,12 @@ APPS = erts kernel stdlib sasl crypto ssl os_mon syntax_tools \
 
 check_plt: compile
 	dialyzer --check_plt --plt $(PLT) --apps $(APPS) \
-		deps/*/ebin apps/*/ebin
+		deps/*/ebin ./ebin
 
 build_plt: compile
 	dialyzer --build_plt --output_plt $(PLT) --apps $(APPS) \
-		deps/*/ebin apps/*/ebin
+		deps/*/ebin ./ebin
 
 dialyzer: compile
-	dialyzer -Wno_return --plt $(PLT) deps/*/ebin apps/*/ebin
+	dialyzer -Wno_return --plt $(PLT) deps/*/ebin ./ebin
 

+ 4 - 0
PLUGIN.md

@@ -0,0 +1,4 @@
+
+Please see [Plugin Design](https://github.com/emqtt/emqttd/wiki/Plugin%20Design).
+
+

+ 26 - 10
include/emqttd.hrl

@@ -129,28 +129,44 @@
 
 -type mqtt_message() :: #mqtt_message{}.
 
+%%------------------------------------------------------------------------------
+%% MQTT Alarm
+%%------------------------------------------------------------------------------
+-record(mqtt_alarm, {
+    id          :: binary(),
+    severity    :: warning | error | critical,
+    title       :: binary(),
+    summary     :: binary(),
+    timestamp   :: erlang:timestamp() %% Timestamp
+}).
+
+-type mqtt_alarm() :: #mqtt_alarm{}.
+
 %%------------------------------------------------------------------------------
 %% MQTT Plugin
 %%------------------------------------------------------------------------------
 -record(mqtt_plugin, {
     name,
     version,
-    attrs,
-    description
+    descr,
+    config,
+    active = false
 }).
 
 -type mqtt_plugin() :: #mqtt_plugin{}.
 
 %%------------------------------------------------------------------------------
-%% MQTT Alarm
+%% MQTT CLI Command
+%% For example: 'broker metrics'
 %%------------------------------------------------------------------------------
--record(mqtt_alarm, {
-    id          :: binary(),
-    severity    :: warning | error | critical,
-    title       :: binary(),
-    summary     :: binary(),
-    timestamp   :: erlang:timestamp() %% Timestamp
+-record(mqtt_cli, {
+    name,
+    action,
+    args = [],
+    opts = [],
+    usage,
+    descr
 }).
 
--type mqtt_alarm() :: #mqtt_alarm{}.
+-type mqtt_cli() :: #mqtt_cli{}.
 

+ 1 - 0
plugins/emqttd_dashboard

@@ -0,0 +1 @@
+Subproject commit 2d3c9aeabeb5289b9ae27c503f017ad71bd81174

+ 1 - 0
plugins/emqttd_plugin_demo/etc/plugin.config

@@ -0,0 +1 @@
+[].

+ 12 - 0
plugins/emqttd_plugin_demo/src/emqttd_plugin_demo.app.src

@@ -0,0 +1,12 @@
+{application, emqttd_plugin_demo,
+ [
+  {description, ""},
+  {vsn, "1"},
+  {registered, []},
+  {applications, [
+                  kernel,
+                  stdlib
+                 ]},
+  {mod, { emqttd_plugin_demo_app, []}},
+  {env, []}
+ ]}.

+ 16 - 0
plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_app.erl

@@ -0,0 +1,16 @@
+-module(emqttd_plugin_demo_app).
+
+-behaviour(application).
+
+%% Application callbacks
+-export([start/2, stop/1]).
+
+%% ===================================================================
+%% Application callbacks
+%% ===================================================================
+
+start(_StartType, _StartArgs) ->
+    emqttd_plugin_demo_sup:start_link().
+
+stop(_State) ->
+    ok.

+ 27 - 0
plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_sup.erl

@@ -0,0 +1,27 @@
+-module(emqttd_plugin_demo_sup).
+
+-behaviour(supervisor).
+
+%% API
+-export([start_link/0]).
+
+%% Supervisor callbacks
+-export([init/1]).
+
+%% Helper macro for declaring children of supervisor
+-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).
+
+%% ===================================================================
+%% API functions
+%% ===================================================================
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+%% ===================================================================
+%% Supervisor callbacks
+%% ===================================================================
+
+init([]) ->
+    {ok, { {one_for_one, 5, 10}, []} }.
+

+ 1 - 0
plugins/emqttd_plugin_mysql

@@ -0,0 +1 @@
+Subproject commit 04baf44a465c1513e75dfdcc2f6507e7a315d2d5

+ 1 - 0
plugins/emysql

@@ -0,0 +1 @@
+Subproject commit 38927104b44b3f8d237bcf3a2b50f2e0608291b3

+ 4 - 0
rebar.config

@@ -18,6 +18,10 @@
 
 {validate_app_modules, true}.
 
+%% plugins cannot find emqttd.hrl without ".." lib dirs:(
+%% but this setting will make deps apps collision
+{lib_dirs, ["../"]}.
+
 {sub_dirs, [
     "rel",
     "plugins/*/"]}.

+ 1 - 22
rel/files/emqttd

@@ -141,13 +141,6 @@ case "$1" in
             echo $RES
             exit 1
         fi
-        # Sanity check the plugins.config file
-        RES=`$NODETOOL_LITE chkconfig $RUNNER_ETC_DIR/plugins.config`
-        if [ $? != 0 ]; then
-            echo "Error reading $RUNNER_ETC_DIR/plugins.config"
-            echo $RES
-            exit 1
-        fi
         HEART_COMMAND="$RUNNER_SCRIPT_DIR/$SCRIPT start"
         export HEART_COMMAND
         mkdir -p $PIPE_DIR
@@ -265,13 +258,6 @@ case "$1" in
             echo $RES
             exit 1
         fi
-        # Sanity check the plugins.config file
-        RES=`$NODETOOL_LITE chkconfig $RUNNER_ETC_DIR/plugins.config`
-        if [ $? != 0 ]; then
-            echo "Error reading $RUNNER_ETC_DIR/plugins.config"
-            echo $RES
-            exit 1
-        fi
         # Setup beam-required vars
         ROOTDIR=$RUNNER_BASE_DIR
         ERL_LIBS=$ROOTDIR/plugins
@@ -279,7 +265,7 @@ case "$1" in
         EMU=beam
         PROGNAME=`echo $0 | sed 's/.*\///'`
         CMD="$BINDIR/erlexec -boot $RUNNER_BASE_DIR/releases/$APP_VSN/$SCRIPT \
-            -embedded -config $RUNNER_ETC_DIR/emqttd.config -config $RUNNER_ETC_DIR/plugins.config \
+            -embedded -config $RUNNER_ETC_DIR/emqttd.config \
             -pa $RUNNER_LIB_DIR/basho-patches \
             -args_file $RUNNER_ETC_DIR/vm.args -- ${1+"$@"}"
         export EMU
@@ -305,13 +291,6 @@ case "$1" in
             echo $RES
             exit 1
         fi
-        # Sanity check the plugins.config file
-        RES=`$NODETOOL_LITE chkconfig $RUNNER_ETC_DIR/plugins.config`
-        if [ $? != 0 ]; then
-            echo "Error reading $RUNNER_ETC_DIR/plugins.config"
-            echo $RES
-            exit 1
-        fi
         echo "config is OK"
         ;;
     escript)

+ 10 - 2
rel/files/emqttd.config

@@ -153,15 +153,23 @@
     {modules, [
         %% Client presence management module.
         %% Publish messages when client connected or disconnected
-        {presence, [{qos, 0}]},
+        {presence, [{qos, 0}]}
 
         %% Subscribe topics automatically when client connected
-        {autosub, [{"$Q/client/$c", 0}]}
+        %% {autosub, [{"$Q/client/$c", 0}]}
 
         %% Rewrite rules
         %% {rewrite, [{file, "etc/rewrite.config"}]}
         
     ]},
+    %% Plugins
+    {plugins, [
+        %% Plugin App Library Dir
+        {plugins_dir, "./plugins"},
+
+        %% File to store loaded plugin names.
+        {loaded_file, "./data/loaded_plugins"}
+    ]},
     %% Listeners
     {listeners, [
         {mqtt, 1883, [

+ 1 - 0
rel/files/loaded_plugins

@@ -0,0 +1 @@
+emqttd_dashboard.

+ 0 - 26
rel/files/plugins.config

@@ -1,26 +0,0 @@
-[
-% {emysql, [
-%    {pool_size, 4},
-%    {host, "localhost"},
-%    {port, 3306},
-%    {username, "root"}, 
-%    {password, "public"},
-%    {database, "mqtt"},
-%    {encoding, utf8}
-% ]},
-% {emqttd_auth_mysql, [
-%     {user_table, mqtt_users},
-%     {password_hash, plain},
-%     {field_mapper, [
-%         {username, username},
-%         {password, password}
-%     ]}
-% ]},
-% {emqttd_dashboard, [
-%    {listener, 
-%        {emqttd_dashboard, 18083, [
-%            {acceptors, 4},
-%            {max_clients, 512}]}}
-% ]}
-%
-].

+ 2 - 2
rel/reltool.config

@@ -87,6 +87,6 @@
            {template, "files/acl.config", "etc/acl.config"},
            {template, "files/rewrite.config", "etc/rewrite.config"},
            {template, "files/clients.config", "etc/clients.config"},
-           {template, "files/plugins.config", "etc/plugins.config"},
-           {template, "files/vm.args", "etc/vm.args"}
+           {template, "files/vm.args", "etc/vm.args"},
+           {copy, "files/loaded_plugins", "data/loaded_plugins"}
           ]}.

+ 1 - 1
src/emqttd.app.src

@@ -1,7 +1,7 @@
 {application, emqttd,
  [
   {description, "Erlang MQTT Broker"},
-  {vsn, "0.9.3"},
+  {vsn, "0.10.0"},
   {modules, []},
   {registered, []},
   {applications, [kernel,

+ 0 - 103
src/emqttd.erl

@@ -30,10 +30,7 @@
 
 -export([start/0, env/1, env/2,
          open_listeners/1, close_listeners/1,
-         load_all_plugins/0, unload_all_plugins/0,
-         load_plugin/1, unload_plugin/1,
          load_all_mods/0, is_mod_enabled/1,
-         loaded_plugins/0,
          is_running/1]).
 
 -define(MQTT_SOCKOPTS, [
@@ -112,106 +109,6 @@ close_listeners(Listeners) when is_list(Listeners) ->
 close_listener({Protocol, Port, _Options}) ->
     esockd:close({Protocol, Port}).
 
-%%------------------------------------------------------------------------------
-%% @doc Load all plugins
-%% @end
-%%------------------------------------------------------------------------------
--spec load_all_plugins() -> [{App :: atom(), ok | {error, any()}}].
-load_all_plugins() ->
-    %% save first
-    case file:consult("etc/plugins.config") of
-        {ok, [PluginApps]} ->
-            application:set_env(emqttd, plugins, [App || {App, _Env} <- PluginApps]),
-            [{App, load_plugin(App)} || {App, _Env} <- PluginApps];
-        {error, enoent} ->
-            lager:error("etc/plugins.config not found!");
-        {error, Error} ->
-            lager:error("Load etc/plugins.config error: ~p", [Error])
-    end.
-
-%%------------------------------------------------------------------------------
-%% @doc Load plugin
-%% @end
-%%------------------------------------------------------------------------------
--spec load_plugin(App :: atom()) -> ok | {error, any()}.
-load_plugin(App) ->
-    case load_app(App) of
-        ok ->
-            start_app(App);
-        {error, Reason} ->
-            {error, Reason}
-    end.
-
-load_app(App) ->
-    case application:load(App) of
-        ok ->
-            lager:info("load plugin ~p successfully", [App]), ok;
-        {error, {already_loaded, App}} ->
-            lager:info("load plugin ~p is already loaded", [App]), ok;
-        {error, Reason} ->
-            lager:error("load plugin ~p error: ~p", [App, Reason]), {error, Reason}
-    end.
-
-start_app(App) ->
-    case application:start(App) of
-        ok ->
-            lager:info("start plugin ~p successfully", [App]), ok;
-        {error, {already_started, App}} ->
-            lager:error("plugin ~p is already started", [App]), ok;
-        {error, Reason} ->
-            lager:error("start plugin ~p error: ~p", [App, Reason]), {error, Reason}
-    end.
-
-%%------------------------------------------------------------------------------
-%% @doc Loaded plugins
-%% @end
-%%------------------------------------------------------------------------------
-loaded_plugins() ->
-    PluginApps = application:get_env(emqttd, plugins, []),
-    [App || App = {Name, _Descr, _Vsn} <- application:which_applications(),
-                                            lists:member(Name, PluginApps)].
-    
-%%------------------------------------------------------------------------------
-%% @doc Unload all plugins
-%% @end
-%%------------------------------------------------------------------------------
--spec unload_all_plugins() -> [{App :: atom(), ok | {error, any()}}].
-unload_all_plugins() ->
-    PluginApps = application:get_env(emqttd, plugins, []),
-    [{App, unload_plugin(App)} || App <- PluginApps].
-
-%%------------------------------------------------------------------------------
-%% @doc Unload plugin
-%% @end
-%%------------------------------------------------------------------------------
--spec unload_plugin(App :: atom()) -> ok | {error, any()}.
-unload_plugin(App) ->
-    case stop_app(App) of
-        ok ->
-            unload_app(App);
-        {error, Reason} ->
-            {error, Reason}
-    end.
-    
-stop_app(App) ->
-    case application:stop(App) of
-        ok ->
-            lager:info("stop plugin ~p successfully~n", [App]), ok;
-        {error, {not_started, App}} ->
-            lager:error("plugin ~p is not started~n", [App]), ok;
-        {error, Reason} ->
-            lager:error("stop plugin ~p error: ~p", [App]), {error, Reason}
-    end.
-
-unload_app(App) ->
-    case application:unload(App) of
-        ok ->
-            lager:info("unload plugin ~p successfully~n", [App]), ok;
-        {error, {not_loaded, App}} ->
-            lager:info("load plugin ~p is not loaded~n", [App]), ok;
-        {error, Reason} ->
-            lager:error("unload plugin ~p error: ~p", [App, Reason]), {error, Reason}
-    end.
 
 load_all_mods() ->
     Mods = application:get_env(emqttd, modules, []),

+ 19 - 7
src/emqttd_app.erl

@@ -52,9 +52,8 @@ start(_StartType, _StartArgs) ->
     {ok, Sup} = emqttd_sup:start_link(),
     start_servers(Sup),
     emqttd:load_all_mods(),
-    emqttd:load_all_plugins(),
-    {ok, Listeners} = application:get_env(listeners),
-    emqttd:open_listeners(Listeners),
+    emqttd_plugins:load(),
+    start_listeners(),
     register(emqttd, self()),
     print_vsn(),
     {ok, Sup}.
@@ -67,6 +66,10 @@ print_vsn() ->
     {ok, Desc} = application:get_key(description),
     ?PRINT("~s ~s is running now~n", [Desc, Vsn]).
 
+start_listeners() ->
+    {ok, Listeners} = application:get_env(listeners),
+    emqttd:open_listeners(Listeners).
+
 start_servers(Sup) ->
     Servers = [{"emqttd trace", emqttd_trace},
                {"emqttd pooler", {supervisor, emqttd_pooler_sup}},
@@ -131,14 +134,23 @@ worker_spec(Name, Opts) ->
 
 %% close all listeners first...
 prep_stop(State) ->
-    %%TODO: esockd app should be running...
-    {ok, Listeners} = application:get_env(listeners),
-    emqttd:close_listeners(Listeners),
+    stop_listeners(), 
+    timer:sleep(2),
+    emqttd_plugins:unload(),
     timer:sleep(2),
     State.
 
+stop_listeners() ->
+    %% ensure that esockd applications is started?
+    case lists:keyfind(esockd, 1, application:which_applications()) of
+        false  ->
+            ignore;
+        _Tuple ->
+            {ok, Listeners} = application:get_env(listeners),
+            emqttd:close_listeners(Listeners)
+    end.
+
 -spec stop(State :: term()) -> term().
 stop(_State) ->
     ok.
 
-

+ 10 - 11
src/emqttd_ctl.erl

@@ -68,7 +68,7 @@ status([]) ->
 %% @end
 %%------------------------------------------------------------------------------
 cluster([]) ->
-    Nodes = [node()|nodes()],
+    Nodes = emqttd_broker:running_nodes(),
     ?PRINT("cluster nodes: ~p~n", [Nodes]);
 
 cluster([SNode]) ->
@@ -77,8 +77,6 @@ cluster([SNode]) ->
     pong ->
         case emqttd:is_running(Node) of
             true ->
-                %%TODO: should not unload here.
-                emqttd:unload_all_plugins(),
                 application:stop(emqttd),
                 application:stop(esockd),
                 application:stop(gproc),
@@ -180,19 +178,20 @@ bridges(["stop", SNode, Topic]) ->
     end.
 
 plugins(["list"]) ->
-    Plugins = emqttd:loaded_plugins(),
-    lists:foreach(fun(Plugin) -> ?PRINT("~p~n", [Plugin]) end, Plugins);
+    lists:foreach(fun(#mqtt_plugin{name = Name, version = Ver, descr = Descr, active = Active}) ->
+            ?PRINT("Plugin(~s, version=~s, description=~s, active=~s)~n", [Name, Ver, Descr, Active])
+        end, emqttd_plugins:list());
 
 plugins(["load", Name]) ->
-    case emqttd:load_plugin(list_to_atom(Name)) of
-        ok -> ?PRINT("plugin ~s is loaded successfully.~n", [Name]);
-        {error, Reason} -> ?PRINT("error: ~s~n", [Reason])
+    case emqttd_plugins:load(list_to_atom(Name)) of
+        {ok, StartedApps} -> ?PRINT("Start apps: ~p~nPlugin ~s loaded successfully.~n", [StartedApps, Name]);
+        {error, Reason} -> ?PRINT("load plugin error: ~p~n", [Reason])
     end;
 
 plugins(["unload", Name]) ->
-    case emqttd:unload_plugin(list_to_atom(Name)) of
-        ok -> ?PRINT("plugin ~s is unloaded successfully.~n", [Name]);
-        {error, Reason} -> ?PRINT("error: ~s~n", [Reason])
+    case emqttd_plugins:unload(list_to_atom(Name)) of
+        ok -> ?PRINT("Plugin ~s unloaded successfully.~n", [Name]);
+        {error, Reason} -> ?PRINT("unload plugin error: ~p~n", [Reason])
     end.
 
 trace(["list"]) ->

+ 300 - 0
src/emqttd_plugins.erl

@@ -0,0 +1,300 @@
+%%%-----------------------------------------------------------------------------
+%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
+%%%
+%%% Permission is hereby granted, free of charge, to any person obtaining a copy
+%%% of this software and associated documentation files (the "Software"), to deal
+%%% in the Software without restriction, including without limitation the rights
+%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+%%% copies of the Software, and to permit persons to whom the Software is
+%%% furnished to do so, subject to the following conditions:
+%%%
+%%% The above copyright notice and this permission notice shall be included in all
+%%% copies or substantial portions of the Software.
+%%%
+%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+%%% SOFTWARE.
+%%%-----------------------------------------------------------------------------
+%%% @doc
+%%% emqttd plugins.
+%%%
+%%% @end
+%%%-----------------------------------------------------------------------------
+
+-module(emqttd_plugins).
+
+-author("Feng Lee <feng@emqtt.io>").
+
+-include("emqttd.hrl").
+
+-export([load/0, unload/0]).
+
+-export([load/1, unload/1]).
+
+-export([list/0]).
+
+%%------------------------------------------------------------------------------
+%% @doc Load all plugins when the broker started.
+%% @end
+%%------------------------------------------------------------------------------
+
+-spec load() -> list() | {error, any()}.
+load() ->
+    case env(loaded_file) of
+        {ok, File} ->
+            with_loaded_file(File, fun(Names) -> load_plugins(Names, false) end);
+        undefined ->
+            %% No plugins available
+            ignore
+    end.
+
+with_loaded_file(File, SuccFun) ->
+    case read_loaded(File) of
+        {ok, Names} ->
+            SuccFun(Names);
+        {error, Error} ->
+            lager:error("Failed to read: ~p, error: ~p", [File, Error]),
+            {error, Error}
+    end.
+
+load_plugins(Names, Persistent) ->
+    Plugins = list(), NotFound = Names -- names(Plugins),
+    case NotFound of
+        [] -> ok;
+        NotFound -> lager:error("Cannot find plugins: ~p", [NotFound])
+    end,
+    NeedToLoad = Names -- NotFound -- names(started_app),
+    [load_plugin(find_plugin(Name, Plugins), Persistent) || Name <- NeedToLoad].
+
+%%------------------------------------------------------------------------------
+%% @doc Unload all plugins before broker stopped.
+%% @end
+%%------------------------------------------------------------------------------
+-spec unload() -> list() | {error, any()}.
+unload() ->
+    case env(loaded_file) of
+        {ok, File} ->
+            with_loaded_file(File, fun(Names) -> stop_plugins(Names) end);
+        undefined ->
+            ignore
+    end.
+
+%% stop plugins
+stop_plugins(Names) ->
+    [stop_app(App) || App <- Names].
+
+%%------------------------------------------------------------------------------
+%% @doc List all available plugins
+%% @end
+%%------------------------------------------------------------------------------
+-spec list() -> [mqtt_plugin()].
+list() ->
+    case env(plugins_dir) of
+        {ok, PluginsDir} -> 
+            AppFiles = filelib:wildcard("*/ebin/*.app", PluginsDir),
+            Plugins = [plugin(PluginsDir, AppFile) || AppFile <- AppFiles],
+            StartedApps = names(started_app),
+            lists:map(fun(Plugin = #mqtt_plugin{name = Name}) ->
+                          case lists:member(Name, StartedApps) of
+                              true  -> Plugin#mqtt_plugin{active = true};
+                              false -> Plugin
+                          end
+                      end, Plugins);
+        undefined ->
+            []
+    end.
+
+plugin(PluginsDir, AppFile0) ->
+    AppFile = filename:join(PluginsDir, AppFile0),
+    {ok, [{application, Name, Attrs}]} = file:consult(AppFile),
+    CfgFile = filename:join([PluginsDir, Name, "etc/plugin.config"]),
+    AppsEnv1 =
+    case filelib:is_file(CfgFile) of
+        true ->
+            {ok, [AppsEnv]} = file:consult(CfgFile),
+            AppsEnv;
+        false ->
+            []
+    end,
+    Ver = proplists:get_value(vsn, Attrs, "0"),
+    Descr = proplists:get_value(description, Attrs, ""),
+    #mqtt_plugin{name = Name, version = Ver, config = AppsEnv1, descr = Descr}.
+
+%%------------------------------------------------------------------------------
+%% @doc Load One Plugin
+%% @end
+%%------------------------------------------------------------------------------
+
+-spec load(atom()) -> ok | {error, any()}.
+load(PluginName) when is_atom(PluginName) ->
+    case lists:member(PluginName, names(started_app)) of
+        true ->
+            lager:error("Plugin ~p is already started", [PluginName]),
+            {error, already_started};
+        false ->
+            case find_plugin(PluginName) of
+                false ->
+                    lager:error("Plugin ~s not found", [PluginName]),
+                    {error, not_found};
+                Plugin ->
+                    load_plugin(Plugin, true)
+            end
+    end.
+
+load_plugin(#mqtt_plugin{name = Name, config = Config}, Persistent) ->
+    case load_app(Name, Config) of
+        ok ->
+            start_app(Name, fun(App) -> plugin_loaded(App, Persistent) end);
+        {error, Error} ->
+            {error, Error}
+    end.
+
+load_app(App, Config) ->
+    case application:load(App) of
+        ok ->
+            set_config(Config);
+        {error, {already_loaded, App}} ->
+            set_config(Config);
+        {error, Error} ->
+            {error, Error}
+    end.
+
+%% This trick is awesome:)
+set_config([]) ->
+    ok;
+set_config([{AppName, Envs} | Config]) ->
+    [application:set_env(AppName, Par, Val) || {Par, Val} <- Envs],
+    set_config(Config).
+
+start_app(App, SuccFun) ->
+    case application:ensure_all_started(App) of
+        {ok, Started} ->
+            lager:info("Started Apps: ~p, load plugin ~p successfully", [Started, App]),
+            SuccFun(App),
+            {ok, Started};
+        {error, {ErrApp, Reason}} ->
+            lager:error("load plugin ~p error, cannot start app ~s for ~p", [App, ErrApp, Reason]),
+            {error, {ErrApp, Reason}}
+    end.
+
+find_plugin(Name) ->
+    find_plugin(Name, list()).
+
+find_plugin(Name, Plugins) ->
+    lists:keyfind(Name, 2, Plugins). 
+
+%%------------------------------------------------------------------------------
+%% @doc UnLoad One Plugin
+%% @end
+%%------------------------------------------------------------------------------
+-spec unload(atom()) -> ok | {error, any()}.
+unload(PluginName) when is_atom(PluginName) ->
+    case {lists:member(PluginName, names(started_app)), lists:member(PluginName, names(plugin))} of
+        {true, true} ->
+            unload_plugin(PluginName, true);
+        {false, _} ->
+            lager:error("Plugin ~p is not started", [PluginName]),
+            {error, not_started};
+        {true, false} ->
+            lager:error("~s is not a plugin, cannot unload it", [PluginName]),
+            {error, not_found}
+    end.
+
+unload_plugin(App, Persistent) ->
+    case stop_app(App) of
+        ok ->
+            plugin_unloaded(App, Persistent), ok;
+        {error, Reason} ->
+            {error, Reason}
+    end.
+    
+stop_app(App) ->
+    case application:stop(App) of
+        ok ->
+            lager:info("stop plugin ~p successfully~n", [App]), ok;
+        {error, {not_started, App}} ->
+            lager:error("plugin ~p is not started~n", [App]), ok;
+        {error, Reason} ->
+            lager:error("stop plugin ~p error: ~p", [App]), {error, Reason}
+    end.
+
+%%%=============================================================================
+%%% Internal functions
+%%%=============================================================================
+
+names(plugin) ->
+    names(list());
+
+names(started_app) ->
+    [Name || {Name, _Descr, _Ver} <- application:which_applications()];
+
+names(Plugins) ->
+    [Name || #mqtt_plugin{name = Name} <- Plugins].
+
+plugin_loaded(_Name, false) ->
+    ok;
+plugin_loaded(Name, true) ->
+    case read_loaded() of
+        {ok, Names} ->
+            case lists:member(Name, Names) of
+                false ->
+                    %% write file if plugin is loaded
+                    write_loaded(lists:append(Names, [Name]));
+                true ->
+                    ignore
+            end;
+        {error, Error} ->
+            lager:error("Cannot read loaded plugins: ~p", [Error])
+    end.
+
+plugin_unloaded(_Name, false) ->
+    ok;
+plugin_unloaded(Name, true) ->
+    case read_loaded() of
+        {ok, Names} ->
+            case lists:member(Name, Names) of
+                true ->
+                    write_loaded(lists:delete(Name, Names));
+                false ->
+                    lager:error("Cannot find ~s in loaded_file", [Name])
+            end;
+        {error, Error} ->
+            lager:error("Cannot read loaded_plugins: ~p", [Error])
+    end.
+
+read_loaded() ->
+    {ok, File} = env(loaded_file),
+    read_loaded(File). 
+
+read_loaded(File) ->
+    file:consult(File).
+
+write_loaded(AppNames) ->
+    {ok, File} = env(loaded_file),
+    case file:open(File, [binary, write]) of
+        {ok, Fd} ->
+            lists:foreach(fun(Name) ->
+                file:write(Fd, iolist_to_binary(io_lib:format("~s.~n", [Name])))
+            end, AppNames);
+        {error, Error} ->
+            lager:error("Open File ~p Error: ~p", [File, Error]),
+            {error, Error}
+    end.
+
+env(Name) ->
+    case application:get_env(emqttd, plugins) of
+        {ok, PluginsEnv} ->
+            case proplists:get_value(Name, PluginsEnv) of
+                undefined ->
+                    undefined;
+                Val ->
+                    {ok, Val}
+            end;
+        undefined ->
+            undefined
+    end.
+

+ 1 - 1
src/emqttd_pooler.erl

@@ -75,7 +75,7 @@ handle_call(_Req, _From, State) ->
     {reply, ok, State}.
 
 handle_cast({async_submit, Fun}, State) ->
-    run(Fun),
+    try run(Fun) catch _:Error -> lager:error("Pooler Error: ~p", [Error]) end,
     {noreply, State};
 
 handle_cast(_Msg, State) ->

+ 1 - 1
src/emqttd_pubsub.erl

@@ -228,7 +228,7 @@ match(Topic) when is_binary(Topic) ->
 %%%=============================================================================
 
 init([Id, _Opts]) ->
-    process_flag(min_heap_size, 1024*1024),
+    %%process_flag(min_heap_size, 1024*1024),
     gproc_pool:connect_worker(pubsub, {?MODULE, Id}),
     {ok, #state{id = Id, submap = maps:new()}}.
 

+ 1 - 1
src/emqttd_session_sup.erl

@@ -57,6 +57,6 @@ start_session(CleanSess, ClientId, ClientPid) ->
 init([]) ->
     {ok, {{simple_one_for_one, 10, 10},
           [{session, {emqttd_session, start_link, []},
-              transient, 10000, worker, [emqttd_session]}]}}.
+              temporary, 10000, worker, [emqttd_session]}]}}.