Feng Lee 10 лет назад
Родитель
Сommit
bb2940cfb6

+ 2 - 0
apps/emqttd/src/emqttd_app.erl

@@ -52,6 +52,7 @@ start(_StartType, _StartArgs) ->
     {ok, Sup} = emqttd_sup:start_link(),
 	start_servers(Sup),
 	{ok, Listeners} = application:get_env(listeners),
+    emqttd_plugin_manager:load_all_plugins(),
     emqttd:open(Listeners),
 	register(emqttd, self()),
     print_vsn(),
@@ -137,5 +138,6 @@ worker_spec(Name, Opts) ->
 stop(_State) ->
 	{ok, Listeners} = application:get_env(listeners),
     emqttd:close(Listeners),
+    emqttd_plugin_manager:unload_all_plugins(),
     ok.
 

+ 4 - 7
apps/emqttd/src/emqttd_ctl.erl

@@ -155,20 +155,17 @@ bridges(["stop", SNode, Topic]) ->
     end.
 
 plugins(["list"]) ->
-    Plugins = emqttd_plugin_manager:list(),
-    lists:foreach(fun({Name, Attrs}) ->
-                ?PRINT("plugin ~s~n", [Name]),
-                [?PRINT("  ~s:~p~n", [Attr, Val]) || {Attr, Val} <- Attrs]
-        end, Plugins);
+    Plugins = emqttd_plugin_manager:loaded_plugins(),
+    lists:foreach(fun(Plugin) -> ?PRINT("~p~n", [Plugin]) end, Plugins);
 
 plugins(["load", Name]) ->
-    case emqttd_plugin_manager:load(list_to_atom(Name)) of
+    case emqttd_plugin_manager:load_plugin(list_to_atom(Name)) of
         ok -> ?PRINT("plugin ~s is loaded successfully.~n", [Name]);
         {error, Reason} -> ?PRINT("error: ~s~n", [Reason])
     end;
 
 plugins(["unload", Name]) ->
-    case emqttd_plugin_manager:load(list_to_atom(Name)) of
+    case emqttd_plugin_manager:unload_plugin(list_to_atom(Name)) of
         ok -> ?PRINT("plugin ~s is unloaded successfully.~n", [Name]);
         {error, Reason} -> ?PRINT("error: ~s~n", [Reason])
     end.

+ 0 - 74
apps/emqttd/src/emqttd_plugin.erl

@@ -1,74 +0,0 @@
-%%%-----------------------------------------------------------------------------
-%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
-%%%
-%%% Permission is hereby granted, free of charge, to any person obtaining a copy
-%%% of this software and associated documentation files (the "Software"), to deal
-%%% in the Software without restriction, including without limitation the rights
-%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-%%% copies of the Software, and to permit persons to whom the Software is
-%%% furnished to do so, subject to the following conditions:
-%%%
-%%% The above copyright notice and this permission notice shall be included in all
-%%% copies or substantial portions of the Software.
-%%%
-%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
-%%% SOFTWARE.
-%%%-----------------------------------------------------------------------------
-%%% @doc
-%%% emqttd plugin.
-%%%
-%%% @end
-%%%-----------------------------------------------------------------------------
--module(emqttd_plugin).
-
--author("Feng Lee <feng@emqtt.io>").
-
--behaviour(gen_server).
-
--define(SERVER, ?MODULE).
-
-%% API Function Exports
--export([start_link/0]).
-
-%% gen_server callbacks
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
-         terminate/2, code_change/3]).
-
-%% ------------------------------------------------------------------
-%% API Function Definitions
-%% ------------------------------------------------------------------
-
-start_link() ->
-    gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
-
-%% ------------------------------------------------------------------
-%% gen_server Function Definitions
-%% ------------------------------------------------------------------
-init(Args) ->
-    {ok, Args}.
-
-handle_call(_Request, _From, State) ->
-    {reply, ok, State}.
-
-handle_cast(_Msg, State) ->
-    {noreply, State}.
-
-handle_info(_Info, State) ->
-    {noreply, State}.
-
-terminate(_Reason, _State) ->
-    ok.
-
-code_change(_OldVsn, State, _Extra) ->
-    {ok, State}.
-
-%% ------------------------------------------------------------------
-%% Internal Function Definitions
-%% ------------------------------------------------------------------
-
-

+ 111 - 8
apps/emqttd/src/emqttd_plugin_manager.erl

@@ -26,28 +26,131 @@
 %%%-----------------------------------------------------------------------------
 -module(emqttd_plugin_manager).
 
+%%TODO: rewrite this module later...
+
 -author("Feng Lee <feng@emqtt.io>").
 
--export([list/0, load/1, unload/1]).
+-export([load_all_plugins/0, unload_all_plugins/0]).
+
+-export([load_plugin/1, unload_plugin/1]).
+
+-export([loaded_plugins/0]).
+
+%%------------------------------------------------------------------------------
+%% @doc Load all plugins
+%% @end
+%%------------------------------------------------------------------------------
+load_all_plugins() ->
+    {ok, [PluginApps]} = file:consult("etc/plugins.config"),
+    LoadedPlugins = load_plugins(PluginApps),
+    RunningPlugins = start_plugins(lists:reverse(LoadedPlugins)),
+    %% save to application env?
+    application:set_env(emqttd, loaded_plugins, lists:reverse(RunningPlugins)).
+
+load_plugins(PluginApps) ->
+    lists:foldl(fun({App, _Env}, Acc) ->
+                    case application:load(App) of
+                        ok ->
+                            io:format("load plugin ~p successfully~n", [App]),
+                            [App | Acc];
+                        {error, {already_loaded, App}} ->
+                            io:format("load plugin ~p is already loaded~n", [App]),
+                            [App | Acc];
+                        {error, Reason} ->
+                            io:format("load plugin ~p error: ~p~n", [App, Reason]),
+                            Acc
+                    end
+                end, [], PluginApps).
+
+start_plugins(PluginApps) ->
+    lists:foldl(fun(App, Acc) ->
+                    case application:start(App) of
+                        ok ->
+                            io:format("start plugin ~p successfully~n", [App]),
+                            [App | Acc];
+                        {error, {already_started, App}} ->
+                            io:format("plugin ~p is already started~n", [App]),
+                            [App | Acc];
+                        {error, Reason} ->
+                            io:format("start plugin ~p error: ~p~n", [App, Reason]),
+                            Acc
+                    end
+                end, [], PluginApps).
 
 %%------------------------------------------------------------------------------
-%% @doc List all loaded plugins
+%% @doc Loaded plugins
 %% @end
 %%------------------------------------------------------------------------------
-list() ->
-    [].
+loaded_plugins() ->
+    LoadedPluginApps = application:get_env(emqttd, loaded_plugins, []),
+    lager:info("loaded plugins: ~p", [LoadedPluginApps]),
+    [App || App = {Name, _Descr, _Vsn} <- application:which_applications(),
+                                            lists:member(Name, LoadedPluginApps)].
+    
+%%------------------------------------------------------------------------------
+%% @doc Unload all plugins
+%% @end
+%%------------------------------------------------------------------------------
+unload_all_plugins() ->
+    LoadedPluginApps = application:get_env(emqttd, loaded_plugins, []),
+    StoppedApps = stop_plugins(lists:reverse(LoadedPluginApps)),
+    UnloadedApps = unload_plugins(lists:reverse(StoppedApps)),
+    case LoadedPluginApps -- UnloadedApps of
+        [] -> 
+            application:unset_env(emqttd, loaded_plugins);
+        LeftApps -> 
+            lager:error("cannot unload plugins: ~p", [LeftApps]),
+            application:set_env(emqttd, loaded_plugins, LeftApps)
+    end.
+
+stop_plugins(PluginApps) ->
+    lists:foldl(fun(App, Acc) ->
+                    case application:stop(App) of
+                        ok ->
+                            io:format("stop plugin ~p successfully~n", [App]),
+                            [App | Acc];
+                        {error, {not_started, App}} ->
+                            io:format("plugin ~p is not started~n", [App]),
+                            [App | Acc];
+                        {error, Reason} ->
+                            io:format("stop plugin ~p error: ~p~n", [App, Reason]),
+                            Acc
+                    end
+                end, [], PluginApps).
+
+unload_plugins(PluginApps) ->
+    lists:foldl(fun({App, _Env}, Acc) ->
+                    case application:unload(App) of
+                        ok ->
+                            io:format("unload plugin ~p successfully~n", [App]),
+                            [App | Acc];
+                        {error, {not_loaded, App}} ->
+                            io:format("load plugin ~p is not loaded~n", [App]),
+                            [App | Acc];
+                        {error, Reason} ->
+                            io:format("unload plugin ~p error: ~p~n", [App, Reason]),
+                            Acc
+                    end
+                end, [], PluginApps).
 
 %%------------------------------------------------------------------------------
 %% @doc Load Plugin
 %% @end
 %%------------------------------------------------------------------------------
-load(Name) when is_atom(Name) ->
-    ok.
+load_plugin(Name) when is_atom(Name) ->
+    %% load app
+    %% start app
+    %% set env...
+    application:start(Name).
 
 %%------------------------------------------------------------------------------
 %% @doc Unload Plugin
 %% @end
 %%------------------------------------------------------------------------------
-unload(Name) when is_atom(Name) ->
-    ok.
+unload_plugin(Name) when is_atom(Name) ->
+    %% stop app
+    %% unload app
+    %% set env
+    application:stop(Name),
+    application:unload(Name).
 

+ 7 - 11
apps/emqttd/src/emqttd_pubsub.erl

@@ -103,7 +103,13 @@ start_link(Id, Opts) ->
 %%------------------------------------------------------------------------------
 -spec create(Topic :: binary()) -> ok | {error, Error :: any()}.
 create(Topic) when is_binary(Topic) ->
-    call({create, Topic}).
+    TopicR = #mqtt_topic{topic = Topic, node = node()},
+    case mnesia:transaction(fun add_topic/1, [TopicR]) of
+        {atomic, ok} ->
+            setstats(topics), ok;
+        {aborted, Error} ->
+            {error, Error}
+    end.
 
 %%------------------------------------------------------------------------------
 %% @doc Subscribe topic
@@ -193,16 +199,6 @@ init([Id, _Opts]) ->
     gproc_pool:connect_worker(pubsub, {?MODULE, Id}),
     {ok, #state{id = Id, submap = maps:new()}}.
 
-handle_call({create, Topic}, _From, State) ->
-    TopicR = #mqtt_topic{topic = Topic, node = node()},
-    Reply = 
-    case mnesia:transaction(fun add_topic/1, [TopicR]) of
-        {atomic, ok} -> ok;
-        {aborted, Error} -> {error, Error}
-    end,
-    setstats(topics), 
-    {reply, Reply, State};
-
 handle_call({subscribe, SubPid, Topics}, _From, State) ->
     TopicSubs = lists:map(fun({Topic, Qos}) ->
                     {#mqtt_topic{topic = Topic, node = node()},