Feng Lee 10 лет назад
Родитель
Сommit
d6a528e0f7

+ 2 - 0
TODO

@@ -12,3 +12,5 @@ TODO 5. dashboard
 TODO 6. emqttd_ctl
 TODO 6. emqttd_ctl
 
 
 TODO 7. transaction on route, and topic?
 TODO 7. transaction on route, and topic?
+
+TODO 8. topics, subscriptions CLI

+ 2 - 1
src/emqttd.app.src

@@ -8,7 +8,8 @@
   {applications, [kernel,
   {applications, [kernel,
                   stdlib,
                   stdlib,
                   gproc,
                   gproc,
-                  esockd]},
+                  esockd,
+                  mochiweb]},
   {mod, {emqttd_app, []}},
   {mod, {emqttd_app, []}},
   {env, []}
   {env, []}
  ]}.
  ]}.

+ 11 - 8
src/emqttd.erl

@@ -48,7 +48,7 @@
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 -spec start() -> ok | {error, any()}.
 -spec start() -> ok | {error, any()}.
 start() ->
 start() ->
-    application:start(emqttd).
+    application:start(?APP).
 
 
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 %% @doc Get environment
 %% @doc Get environment
@@ -56,7 +56,7 @@ start() ->
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 -spec env(atom()) -> list().
 -spec env(atom()) -> list().
 env(Group) ->
 env(Group) ->
-    application:get_env(emqttd, Group, []).
+    application:get_env(?APP, Group, []).
 
 
 -spec env(atom(), atom()) -> undefined | any().
 -spec env(atom(), atom()) -> undefined | any().
 env(Group, Name) ->
 env(Group, Name) ->
@@ -111,11 +111,14 @@ stop_listener({Protocol, Port, _Options}) ->
     esockd:close({Protocol, Port}).
     esockd:close({Protocol, Port}).
 
 
 load_all_mods() ->
 load_all_mods() ->
-    lists:foreach(fun({Name, Opts}) ->
-        Mod = list_to_atom("emqttd_mod_" ++ atom_to_list(Name)),
-        Mod:load(Opts),
-        lager:info("load module ~s successfully", [Name])
-    end, env(modules)).
+    lists:foreach(fun load_mod/1, env(modules)).
+
+load_mod({Name, Opts}) ->
+    Mod = list_to_atom("emqttd_mod_" ++ atom_to_list(Name)),
+    case catch Mod:load(Opts) of
+        {ok, _State}     -> lager:info("load module ~s successfully", [Name]);
+        {'EXIT', Reason} -> lager:error("load module ~s error: ~p", [Name, Reason])
+    end.
 
 
 is_mod_enabled(Name) ->
 is_mod_enabled(Name) ->
     env(modules, Name) =/= undefined.
     env(modules, Name) =/= undefined.
@@ -125,7 +128,7 @@ is_mod_enabled(Name) ->
 %% @end
 %% @end
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 is_running(Node) ->
 is_running(Node) ->
-    case rpc:call(Node, erlang, whereis, [emqttd]) of
+    case rpc:call(Node, erlang, whereis, [?APP]) of
         {badrpc, _}          -> false;
         {badrpc, _}          -> false;
         undefined            -> false;
         undefined            -> false;
         Pid when is_pid(Pid) -> true
         Pid when is_pid(Pid) -> true

+ 20 - 17
src/emqttd_access_control.erl

@@ -59,9 +59,9 @@
 start_link() ->
 start_link() ->
     start_link(emqttd:env(access)).
     start_link(emqttd:env(access)).
 
 
--spec start_link(AcOpts :: list()) -> {ok, pid()} | ignore | {error, any()}.
-start_link(AcOpts) ->
-    gen_server:start_link({local, ?SERVER}, ?MODULE, [AcOpts], []).
+-spec start_link(Opts :: list()) -> {ok, pid()} | ignore | {error, any()}.
+start_link(Opts) ->
+    gen_server:start_link({local, ?SERVER}, ?MODULE, [Opts], []).
 
 
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 %% @doc Authenticate MQTT Client
 %% @doc Authenticate MQTT Client
@@ -73,10 +73,11 @@ auth(Client, Password) when is_record(Client, mqtt_client) ->
 auth(_Client, _Password, []) ->
 auth(_Client, _Password, []) ->
     {error, "No auth module to check!"};
     {error, "No auth module to check!"};
 auth(Client, Password, [{Mod, State, _Seq} | Mods]) ->
 auth(Client, Password, [{Mod, State, _Seq} | Mods]) ->
-    case Mod:check(Client, Password, State) of
-        ok -> ok;
+    case catch Mod:check(Client, Password, State) of
+        ok              -> ok;
+        ignore          -> auth(Client, Password, Mods);
         {error, Reason} -> {error, Reason};
         {error, Reason} -> {error, Reason};
-        ignore -> auth(Client, Password, Mods)
+        {'EXIT', Error} -> {error, Error}
     end.
     end.
 
 
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
@@ -114,7 +115,7 @@ reload_acl() ->
 %% @doc Register authentication or ACL module
 %% @doc Register authentication or ACL module
 %% @end
 %% @end
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
--spec register_mod(Type :: auth | acl, Mod :: atom(), Opts :: list()) -> ok | {error, any()}.
+-spec register_mod(auth | acl, atom(), list()) -> ok | {error, any()}.
 register_mod(Type, Mod, Opts) when Type =:= auth; Type =:= acl->
 register_mod(Type, Mod, Opts) when Type =:= auth; Type =:= acl->
     register_mod(Type, Mod, Opts, 0).
     register_mod(Type, Mod, Opts, 0).
 
 
@@ -140,10 +141,9 @@ lookup_mods(Type) ->
         [] -> [];
         [] -> [];
         [{_, Mods}] -> Mods
         [{_, Mods}] -> Mods
     end.
     end.
-tab_key(auth) ->
-    auth_modules;
-tab_key(acl) ->
-    acl_modules.
+
+tab_key(auth) -> auth_modules;
+tab_key(acl)  -> acl_modules.
 
 
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 %% @doc Stop access control server
 %% @doc Stop access control server
@@ -156,10 +156,10 @@ stop() ->
 %%% gen_server callbacks
 %%% gen_server callbacks
 %%%=============================================================================
 %%%=============================================================================
 
 
-init([AcOpts]) ->
-	ets:new(?ACCESS_CONTROL_TAB, [set, named_table, protected, {read_concurrency, true}]),
-    ets:insert(?ACCESS_CONTROL_TAB, {auth_modules, init_mods(auth, proplists:get_value(auth, AcOpts))}),
-    ets:insert(?ACCESS_CONTROL_TAB, {acl_modules, init_mods(acl, proplists:get_value(acl, AcOpts))}),
+init([Opts]) ->
+	ets:new(?ACCESS_CONTROL_TAB, [set, named_table, protected]),
+    ets:insert(?ACCESS_CONTROL_TAB, {auth_modules, init_mods(auth, proplists:get_value(auth, Opts))}),
+    ets:insert(?ACCESS_CONTROL_TAB, {acl_modules, init_mods(acl, proplists:get_value(acl, Opts))}),
 	{ok, state}.
 	{ok, state}.
 
 
 init_mods(auth, AuthMods) ->
 init_mods(auth, AuthMods) ->
@@ -230,8 +230,11 @@ code_change(_OldVsn, State, _Extra) ->
 %%%=============================================================================
 %%%=============================================================================
 
 
 authmod(Name) when is_atom(Name) ->
 authmod(Name) when is_atom(Name) ->
-	list_to_atom(lists:concat(["emqttd_auth_", Name])).
+    mod(emqttd_auth_, Name).
 
 
 aclmod(Name) when is_atom(Name) ->
 aclmod(Name) when is_atom(Name) ->
-	list_to_atom(lists:concat(["emqttd_acl_", Name])).
+    mod(emqttd_acl_, Name).
+
+mod(Prefix, Name) ->
+    list_to_atom(lists:concat([Prefix, Name])).
 
 

+ 2 - 3
src/emqttd_acl_internal.erl

@@ -63,7 +63,7 @@ all_rules() ->
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 -spec init(AclOpts :: list()) -> {ok, State :: any()}.
 -spec init(AclOpts :: list()) -> {ok, State :: any()}.
 init(AclOpts) ->
 init(AclOpts) ->
-    ets:new(?ACL_RULE_TAB, [set, public, named_table, {read_concurrency, true}]),
+    ets:new(?ACL_RULE_TAB, [set, public, named_table]),
     AclFile = proplists:get_value(file, AclOpts),
     AclFile = proplists:get_value(file, AclOpts),
     Default = proplists:get_value(nomatch, AclOpts, allow),
     Default = proplists:get_value(nomatch, AclOpts, allow),
     State = #state{acl_file = AclFile, nomatch = Default},
     State = #state{acl_file = AclFile, nomatch = Default},
@@ -139,6 +139,5 @@ reload_acl(State) ->
 %% @end
 %% @end
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 -spec description() -> string().
 -spec description() -> string().
-description() ->
-    "Internal ACL with etc/acl.config".
+description() -> "Internal ACL with etc/acl.config".
 
 

+ 2 - 2
src/emqttd_alarm.erl

@@ -105,10 +105,10 @@ handle_event({set_alarm, Alarm = #mqtt_alarm{id       = AlarmId,
     emqttd_pubsub:publish(alarm_msg(alert, AlarmId, Json)),
     emqttd_pubsub:publish(alarm_msg(alert, AlarmId, Json)),
     {ok, [Alarm#mqtt_alarm{timestamp = Timestamp} | Alarms]};
     {ok, [Alarm#mqtt_alarm{timestamp = Timestamp} | Alarms]};
 
 
-handle_event({clear_alarm, AlarmId}, Alarms)->
+handle_event({clear_alarm, AlarmId}, Alarms) ->
     Json = mochijson2:encode([{id, AlarmId}, {ts, emqttd_util:now_to_secs()}]),
     Json = mochijson2:encode([{id, AlarmId}, {ts, emqttd_util:now_to_secs()}]),
     emqttd_pubsub:publish(alarm_msg(clear, AlarmId, Json)),
     emqttd_pubsub:publish(alarm_msg(clear, AlarmId, Json)),
-    {ok, lists:keydelete(AlarmId, 2, Alarms)};
+    {ok, lists:keydelete(AlarmId, 2, Alarms), hibernate};
 
 
 handle_event(_, Alarms)->
 handle_event(_, Alarms)->
     {ok, Alarms}.
     {ok, Alarms}.

+ 1 - 1
src/emqttd_app.erl

@@ -78,7 +78,7 @@ start_servers(Sup) ->
                {"emqttd mod supervisor", emqttd_mod_sup},
                {"emqttd mod supervisor", emqttd_mod_sup},
                {"emqttd bridge supervisor", {supervisor, emqttd_bridge_sup}},
                {"emqttd bridge supervisor", {supervisor, emqttd_bridge_sup}},
                {"emqttd access control", emqttd_access_control},
                {"emqttd access control", emqttd_access_control},
-               {"emqttd system monitor", emqttd_sysmon, emqttd:env(sysmon)}],
+               {"emqttd system monitor", {supervisor, emqttd_sysmon_sup}}],
     [start_server(Sup, Server) || Server <- Servers].
     [start_server(Sup, Server) || Server <- Servers].
 
 
 start_server(_Sup, {Name, F}) when is_function(F) ->
 start_server(_Sup, {Name, F}) when is_function(F) ->

+ 1 - 2
src/emqttd_auth_ldap.erl

@@ -81,6 +81,5 @@ ldap_bind(LDAP, UserDn, Password) ->
 fill(Username, UserDn) ->
 fill(Username, UserDn) ->
     re:replace(UserDn, "\\$u", Username, [global, {return, list}]).
     re:replace(UserDn, "\\$u", Username, [global, {return, list}]).
 
 
-description() -> 
-    "LDAP Authentication Module".
+description() -> "LDAP Authentication Module".
 
 

+ 2 - 2
src/emqttd_bridge.erl

@@ -119,12 +119,12 @@ handle_info({dispatch, Msg}, State = #state{mqueue = MQ, status = down}) ->
 
 
 handle_info({dispatch, Msg}, State = #state{node = Node, status = up}) ->
 handle_info({dispatch, Msg}, State = #state{node = Node, status = up}) ->
     rpc:cast(Node, emqttd_pubsub, publish, [transform(Msg, State)]),
     rpc:cast(Node, emqttd_pubsub, publish, [transform(Msg, State)]),
-    {noreply, State};
+    {noreply, State, hibernate};
 
 
 handle_info({nodedown, Node}, State = #state{node = Node, ping_down_interval = Interval}) ->
 handle_info({nodedown, Node}, State = #state{node = Node, ping_down_interval = Interval}) ->
     lager:warning("Bridge Node Down: ~p", [Node]),
     lager:warning("Bridge Node Down: ~p", [Node]),
     erlang:send_after(Interval, self(), ping_down_node),
     erlang:send_after(Interval, self(), ping_down_node),
-    {noreply, State#state{status = down}};
+    {noreply, State#state{status = down}, hibernate};
 
 
 handle_info({nodeup, Node}, State = #state{node = Node}) ->
 handle_info({nodeup, Node}, State = #state{node = Node}) ->
     %% TODO: Really fast??
     %% TODO: Really fast??

+ 1 - 1
src/emqttd_bridge_sup.erl

@@ -80,7 +80,7 @@ start_bridge(Node, SubTopic, Options) when is_atom(Node) and is_binary(SubTopic)
 stop_bridge(Node, SubTopic) ->
 stop_bridge(Node, SubTopic) ->
     ChildId = bridge_id(Node, SubTopic),
     ChildId = bridge_id(Node, SubTopic),
     case supervisor:terminate_child(?MODULE, ChildId) of
     case supervisor:terminate_child(?MODULE, ChildId) of
-        ok -> 
+        ok ->
             supervisor:delete_child(?MODULE, ChildId);
             supervisor:delete_child(?MODULE, ChildId);
         {error, Reason} ->
         {error, Reason} ->
             {error, Reason}
             {error, Reason}

+ 2 - 2
src/emqttd_broker.erl

@@ -216,7 +216,7 @@ stop_tick(TRef) ->
 %%%=============================================================================
 %%%=============================================================================
 
 
 init([]) ->
 init([]) ->
-    random:seed(now()),
+    random:seed(os:timestamp()),
     ets:new(?BROKER_TAB, [set, public, named_table]),
     ets:new(?BROKER_TAB, [set, public, named_table]),
     % Create $SYS Topics
     % Create $SYS Topics
     emqttd_pubsub:create(<<"$SYS/brokers">>),
     emqttd_pubsub:create(<<"$SYS/brokers">>),
@@ -270,7 +270,7 @@ handle_info(tick, State) ->
     retain(brokers),
     retain(brokers),
     retain(version,  list_to_binary(version())),
     retain(version,  list_to_binary(version())),
     retain(sysdescr, list_to_binary(sysdescr())),
     retain(sysdescr, list_to_binary(sysdescr())),
-    {noreply, State};
+    {noreply, State, hibernate};
 
 
 handle_info(_Info, State) ->
 handle_info(_Info, State) ->
     {noreply, State}.
     {noreply, State}.

+ 14 - 8
src/emqttd_cli.erl

@@ -39,6 +39,8 @@
          clients/1, sessions/1, plugins/1, listeners/1,
          clients/1, sessions/1, plugins/1, listeners/1,
          vm/1, mnesia/1, trace/1]).
          vm/1, mnesia/1, trace/1]).
 
 
+%% TODO: topics, subscriptions...
+
 -define(PROC_INFOKEYS, [status,
 -define(PROC_INFOKEYS, [status,
                         memory,
                         memory,
                         message_queue_len,
                         message_queue_len,
@@ -47,6 +49,8 @@
                         stack_size,
                         stack_size,
                         reductions]).
                         reductions]).
 
 
+-define(APP, emqttd).
+
 load() ->
 load() ->
     Cmds = [Fun || {Fun, _} <- ?MODULE:module_info(exports), is_cmd(Fun)],
     Cmds = [Fun || {Fun, _} <- ?MODULE:module_info(exports), is_cmd(Fun)],
     [emqttd_ctl:register_cmd(Cmd, {?MODULE, Cmd}, []) || Cmd <- Cmds].
     [emqttd_ctl:register_cmd(Cmd, {?MODULE, Cmd}, []) || Cmd <- Cmds].
@@ -65,10 +69,10 @@ is_cmd(Fun) ->
 status([]) ->
 status([]) ->
     {InternalStatus, _ProvidedStatus} = init:get_status(),
     {InternalStatus, _ProvidedStatus} = init:get_status(),
     ?PRINT("Node ~p is ~p~n", [node(), InternalStatus]),
     ?PRINT("Node ~p is ~p~n", [node(), InternalStatus]),
-    case lists:keysearch(emqttd, 1, application:which_applications()) of
+    case lists:keysearch(?APP, 1, application:which_applications()) of
     false ->
     false ->
         ?PRINT_MSG("emqttd is not running~n");
         ?PRINT_MSG("emqttd is not running~n");
-    {value, {emqttd, _Desc, Vsn}} ->
+    {value, {?APP, _Desc, Vsn}} ->
         ?PRINT("emqttd ~s is running~n", [Vsn])
         ?PRINT("emqttd ~s is running~n", [Vsn])
     end;
     end;
 status(_) ->
 status(_) ->
@@ -129,13 +133,9 @@ cluster([SNode]) ->
         false ->
         false ->
             cluster(Node, fun() ->
             cluster(Node, fun() ->
                 emqttd_plugins:unload(),
                 emqttd_plugins:unload(),
-                application:stop(emqttd),
-                application:stop(esockd),
-                application:stop(gproc),
+                stop_apps(),
                 emqttd_mnesia:cluster(Node),
                 emqttd_mnesia:cluster(Node),
-                application:start(gproc),
-                application:start(esockd),
-                application:start(emqttd)
+                start_apps() 
            end)
            end)
     end;
     end;
 
 
@@ -157,6 +157,12 @@ cluster(pong, Node, DoCluster) ->
 cluster(pang, Node, _DoCluster) ->
 cluster(pang, Node, _DoCluster) ->
     ?PRINT("Cannot connect to ~s~n", [Node]).
     ?PRINT("Cannot connect to ~s~n", [Node]).
 
 
+stop_apps() ->
+    [application:stop(App) || App <- [emqttd, esockd, gproc]].
+
+start_apps() ->
+    [application:start(App) || App <- [gproc, esockd, emqttd]].
+
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 %% @doc Query clients
 %% @doc Query clients
 %% @end
 %% @end

+ 45 - 0
src/emqttd_sysmon_sup.erl

@@ -0,0 +1,45 @@
+%%%-----------------------------------------------------------------------------
+%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
+%%%
+%%% Permission is hereby granted, free of charge, to any person obtaining a copy
+%%% of this software and associated documentation files (the "Software"), to deal
+%%% in the Software without restriction, including without limitation the rights
+%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+%%% copies of the Software, and to permit persons to whom the Software is
+%%% furnished to do so, subject to the following conditions:
+%%%
+%%% The above copyright notice and this permission notice shall be included in all
+%%% copies or substantial portions of the Software.
+%%%
+%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+%%% SOFTWARE.
+%%%-----------------------------------------------------------------------------
+%%% @doc emqttd sysmon supervisor.
+%%%
+%%% @author Feng Lee <feng@emqtt.io>
+%%%-----------------------------------------------------------------------------
+-module(emqttd_sysmon_sup).
+
+-behaviour(supervisor).
+
+%% API
+-export([start_link/0]).
+
+%% Supervisor callbacks
+-export([init/1]).
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+init([]) ->
+    Env    = emqttd:env(sysmon),
+    Sysmon = {sysmon,
+               {emqttd_sysmon, start_link, [Env]},
+                 permanent, 5000, worker, [emqttd_sysmon]},
+    {ok, {{one_for_one, 10, 100}, [Sysmon]}}.
+