Feng Lee 11 лет назад
Родитель
Сommit
f1632ef2df

+ 1 - 1
apps/emqttd/src/emqttd_app.erl

@@ -75,7 +75,7 @@ start_servers(Sup) ->
             {"emqttd config", emqttd_config},
             {"emqttd config", emqttd_config},
             {"emqttd event", emqttd_event},
             {"emqttd event", emqttd_event},
             {"emqttd pooler", {supervisor, emqttd_pooler_sup}},
             {"emqttd pooler", {supervisor, emqttd_pooler_sup}},
-            {"emqttd client manager", emqttd_cm},
+            {"emqttd client manager", {supervisor, emqttd_cm_sup}},
             {"emqttd session manager", emqttd_sm},
             {"emqttd session manager", emqttd_sm},
             {"emqttd session supervisor", {supervisor, emqttd_session_sup}, SessOpts},
             {"emqttd session supervisor", {supervisor, emqttd_session_sup}, SessOpts},
             {"emqttd pubsub", {supervisor, emqttd_pubsub_sup}, PubSubOpts},
             {"emqttd pubsub", {supervisor, emqttd_pubsub_sup}, PubSubOpts},

+ 43 - 73
apps/emqttd/src/emqttd_cm.erl

@@ -33,24 +33,17 @@
 -define(SERVER, ?MODULE).
 -define(SERVER, ?MODULE).
 
 
 %% API Exports 
 %% API Exports 
--export([start_link/0]).
+-export([start_link/2]).
 
 
 -export([lookup/1, register/1, unregister/1]).
 -export([lookup/1, register/1, unregister/1]).
 
 
-%% Stats 
--export([getstats/0]).
-
 %% gen_server Function Exports
 %% gen_server Function Exports
--export([init/1,
-		 handle_call/3,
-		 handle_cast/2,
-		 handle_info/2,
-         terminate/2,
-		 code_change/3]).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+         terminate/2, code_change/3]).
 
 
 -record(state, {tab}).
 -record(state, {tab}).
 
 
--define(CLIENT_TAB, mqtt_client).
+-define(POOL, cm).
 
 
 %%%=============================================================================
 %%%=============================================================================
 %%% API
 %%% API
@@ -62,9 +55,11 @@
 %%
 %%
 %% @end
 %% @end
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
--spec start_link() -> {ok, pid()} | ignore | {error, any()}.
-start_link() ->
-    gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+-spec start_link(Id, TabId) -> {ok, pid()} | ignore | {error, any()} when
+        Id :: pos_integer(),
+        TabId :: ets:tid().
+start_link(Id, TabId) ->
+    gen_server:start_link(?MODULE, [Id, TabId], []).
 
 
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 %% @doc
 %% @doc
@@ -74,7 +69,7 @@ start_link() ->
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 -spec lookup(ClientId :: binary()) -> pid() | undefined.
 -spec lookup(ClientId :: binary()) -> pid() | undefined.
 lookup(ClientId) when is_binary(ClientId) ->
 lookup(ClientId) when is_binary(ClientId) ->
-	case ets:lookup(?CLIENT_TAB, ClientId) of
+    case ets:lookup(emqttd_cm_sup:table(), ClientId) of
 	[{_, Pid, _}] -> Pid;
 	[{_, Pid, _}] -> Pid;
 	[] -> undefined
 	[] -> undefined
 	end.
 	end.
@@ -85,12 +80,8 @@ lookup(ClientId) when is_binary(ClientId) ->
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 -spec register(ClientId :: binary()) -> ok.
 -spec register(ClientId :: binary()) -> ok.
 register(ClientId) when is_binary(ClientId) ->
 register(ClientId) when is_binary(ClientId) ->
-    Pid = self(),
-    %% this is atomic
-    case ets:insert_new(?CLIENT_TAB, {ClientId, Pid, undefined}) of
-        true -> gen_server:cast(?SERVER, {monitor, ClientId, Pid});
-        false -> gen_server:cast(?SERVER, {register, ClientId, Pid})
-    end.
+    CmPid = gproc_pool:pick_worker(?POOL, ClientId),
+    gen_server:call(CmPid, {register, ClientId, self()}, infinity).
 
 
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 %% @doc
 %% @doc
@@ -100,54 +91,46 @@ register(ClientId) when is_binary(ClientId) ->
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 -spec unregister(ClientId :: binary()) -> ok.
 -spec unregister(ClientId :: binary()) -> ok.
 unregister(ClientId) when is_binary(ClientId) ->
 unregister(ClientId) when is_binary(ClientId) ->
-    gen_server:cast(?SERVER, {unregister, ClientId, self()}).
-
-%%------------------------------------------------------------------------------
-%% @doc
-%% Get statistics of client manager.
-%%
-%% @end
-%%------------------------------------------------------------------------------
-getstats() ->
-    [{Name, emqttd_broker:getstat(Name)} || 
-        Name <- ['clients/count', 'clients/max']].
+    CmPid = gproc_pool:pick_worker(?POOL, ClientId),
+    gen_server:cast(CmPid, {unregister, ClientId, self()}).
 
 
 %%%=============================================================================
 %%%=============================================================================
 %%% gen_server callbacks
 %%% gen_server callbacks
 %%%=============================================================================
 %%%=============================================================================
 
 
-init([]) ->
-    TabId = ets:new(?CLIENT_TAB, [set,
-                                  named_table,
-                                  public,
-                                  {write_concurrency, true}]),
+init([Id, TabId]) ->
+    gproc_pool:connect_worker(?POOL, {?MODULE, Id}),
     {ok, #state{tab = TabId}}.
     {ok, #state{tab = TabId}}.
 
 
-handle_call(Req, _From, State) ->
-    lager:error("unexpected request: ~p", [Req]),
-    {reply, {error, badreq}, State}.
-
-handle_cast({register, ClientId, Pid}, State=#state{tab = Tab}) ->
-    case registerd(Tab, {ClientId, Pid}) of
-        true -> 
+handle_call({register, ClientId, Pid}, _From, State = #state{tab = Tab}) ->
+	case ets:lookup(Tab, ClientId) of
+        [{_, Pid, _}] ->
+			lager:error("clientId '~s' has been registered with ~p", [ClientId, Pid]),
             ignore;
             ignore;
-        false -> 
+		[{_, OldPid, MRef}] ->
+			lager:error("clientId '~s' is duplicated: pid=~p, oldpid=~p", [ClientId, Pid, OldPid]),
+            %%TODO: tell session old client is down here?
+            case emqttd_session:lookup(ClientId) of
+                undefined -> ok;
+                SessPid -> emqttd_session:client_down(SessPid, {OldPid, duplicate_id})
+            end,
+			OldPid ! {stop, duplicate_id, Pid},
+			erlang:demonitor(MRef),
+            ets:insert(Tab, {ClientId, Pid, erlang:monitor(process, Pid)});
+		[] -> 
             ets:insert(Tab, {ClientId, Pid, erlang:monitor(process, Pid)})
             ets:insert(Tab, {ClientId, Pid, erlang:monitor(process, Pid)})
-    end,
-    {noreply, setstats(State)};
+	end,
+    {reply, ok, State};
 
 
-handle_cast({monitor, ClientId, Pid}, State = #state{tab = Tab}) ->
-    case ets:update_element(Tab, ClientId, {3, erlang:monitor(process, Pid)}) of
-        true -> ok;
-        false -> lager:error("failed to monitor clientId '~s' with pid ~p", [ClientId, Pid]) 
-    end,
-    {noreply, setstats(State)};
+handle_call(Req, _From, State) ->
+    lager:error("unexpected request: ~p", [Req]),
+    {reply, {error, badreq}, State}.
 
 
-handle_cast({unregister, ClientId, Pid}, State) ->
-	case ets:lookup(?CLIENT_TAB, ClientId) of
+handle_cast({unregister, ClientId, Pid}, State = #state{tab = TabId}) ->
+	case ets:lookup(TabId, ClientId) of
 	[{_, Pid, MRef}] ->
 	[{_, Pid, MRef}] ->
 		erlang:demonitor(MRef, [flush]),
 		erlang:demonitor(MRef, [flush]),
-		ets:delete(?CLIENT_TAB, ClientId);
+		ets:delete(TabId, ClientId);
 	[_] -> 
 	[_] -> 
 		ignore;
 		ignore;
 	[] ->
 	[] ->
@@ -158,8 +141,8 @@ handle_cast({unregister, ClientId, Pid}, State) ->
 handle_cast(_Msg, State) ->
 handle_cast(_Msg, State) ->
     {noreply, State}.
     {noreply, State}.
 
 
-handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) ->
-	ets:match_delete(?CLIENT_TAB, {'_', DownPid, MRef}),
+handle_info({'DOWN', MRef, process, DownPid, _Reason}, State = #state{tab = TabId}) ->
+	ets:match_delete(TabId, {'_', DownPid, MRef}),
     {noreply, setstats(State)};
     {noreply, setstats(State)};
 
 
 handle_info(_Info, State) ->
 handle_info(_Info, State) ->
@@ -174,23 +157,10 @@ code_change(_OldVsn, State, _Extra) ->
 %%%=============================================================================
 %%%=============================================================================
 %%% Internal functions
 %%% Internal functions
 %%%=============================================================================
 %%%=============================================================================
-registerd(Tab, {ClientId, Pid}) ->
-	case ets:lookup(Tab, ClientId) of
-        [{_, Pid, _}] ->
-			lager:error("clientId '~s' has been registered with ~p", [ClientId, Pid]),
-            true;
-		[{_, OldPid, MRef}] ->
-			lager:error("clientId '~s' is duplicated: pid=~p, oldpid=~p", [ClientId, Pid, OldPid]),
-			OldPid ! {stop, duplicate_id, Pid},
-			erlang:demonitor(MRef),
-            false;
-		[] -> 
-            false
-	end.
 
 
-setstats(State) ->
+setstats(State = #state{tab = TabId}) ->
     emqttd_broker:setstats('clients/count',
     emqttd_broker:setstats('clients/count',
                            'clients/max',
                            'clients/max',
-                           ets:info(?CLIENT_TAB, size)), State.
+                           ets:info(TabId, size)), State.
 
 
 
 

+ 62 - 0
apps/emqttd/src/emqttd_cm_sup.erl

@@ -0,0 +1,62 @@
+%%%-----------------------------------------------------------------------------
+%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
+%%%
+%%% Permission is hereby granted, free of charge, to any person obtaining a copy
+%%% of this software and associated documentation files (the "Software"), to deal
+%%% in the Software without restriction, including without limitation the rights
+%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+%%% copies of the Software, and to permit persons to whom the Software is
+%%% furnished to do so, subject to the following conditions:
+%%%
+%%% The above copyright notice and this permission notice shall be included in all
+%%% copies or substantial portions of the Software.
+%%%
+%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+%%% SOFTWARE.
+%%%-----------------------------------------------------------------------------
+%%% @doc
+%%% emqttd client manager supervisor.
+%%%
+%%% @end
+%%%-----------------------------------------------------------------------------
+-module(emqttd_cm_sup).
+
+-author('feng@emqtt.io').
+
+-include("emqttd.hrl").
+
+-behaviour(supervisor).
+
+%% API
+-export([start_link/0, table/0]).
+
+%% Supervisor callbacks
+-export([init/1]).
+
+-define(CLIENT_TAB, mqtt_client).
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+table() -> ?CLIENT_TAB.
+
+init([]) ->
+    TabId = ets:new(?CLIENT_TAB, [set, named_table, public,
+                                  {write_concurrency, true}]),
+    Schedulers = erlang:system_info(schedulers),
+    gproc_pool:new(cm, hash, [{size, Schedulers}]),
+    Children = lists:map(
+                 fun(I) ->
+                    Name = {emqttd_cm, I},
+                    gproc_pool:add_worker(cm, Name, I),
+                    {Name, {emqttd_cm, start_link, [I, TabId]},
+                        permanent, 10000, worker, [emqttd_cm]}
+                 end, lists:seq(1, Schedulers)),
+    {ok, {{one_for_all, 10, 100}, Children}}.
+
+

+ 41 - 9
apps/emqttd/src/emqttd_session.erl

@@ -42,7 +42,7 @@
 -export([store/2]).
 -export([store/2]).
 
 
 %% Start gen_server
 %% Start gen_server
--export([start_link/3]).
+-export([start_link/3, client_down/2]).
 
 
 %% gen_server Function Exports
 %% gen_server Function Exports
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
@@ -252,9 +252,20 @@ initial_state(ClientId, ClientPid) ->
     State = initial_state(ClientId),
     State = initial_state(ClientId),
     State#session_state{client_pid = ClientPid}.
     State#session_state{client_pid = ClientPid}.
 
 
+%%------------------------------------------------------------------------------
+%% @doc Start a session process.
+%% @end
+%%------------------------------------------------------------------------------
 start_link(SessOpts, ClientId, ClientPid) ->
 start_link(SessOpts, ClientId, ClientPid) ->
     gen_server:start_link(?MODULE, [SessOpts, ClientId, ClientPid], []).
     gen_server:start_link(?MODULE, [SessOpts, ClientId, ClientPid], []).
 
 
+%%------------------------------------------------------------------------------
+%% @doc Notify the session process that client will be DOWN.
+%% @end
+%%------------------------------------------------------------------------------
+client_down(SessPid, {ClientPid, Reason}) ->
+    gen_server:cast(SessPid, {'DOWN', ClientPid, Reason}). 
+
 %%%=============================================================================
 %%%=============================================================================
 %%% gen_server callbacks
 %%% gen_server callbacks
 %%%=============================================================================
 %%%=============================================================================
@@ -279,7 +290,8 @@ handle_call({unsubscribe, Topics}, _From, State) ->
     {reply, ok, NewState};
     {reply, ok, NewState};
 
 
 handle_call(Req, _From, State) ->
 handle_call(Req, _From, State) ->
-    {stop, {badreq, Req}, State}.
+    lager:error("Unexpected request: ~p", [Req]),
+    {reply, error, State}.
 
 
 handle_cast({resume, ClientId, ClientPid}, State = #session_state{
 handle_cast({resume, ClientId, ClientPid}, State = #session_state{
                                                       clientid      = ClientId,
                                                       clientid      = ClientId,
@@ -336,8 +348,20 @@ handle_cast({destroy, ClientId}, State = #session_state{clientid = ClientId}) ->
     lager:warning("Session ~s destroyed", [ClientId]),
     lager:warning("Session ~s destroyed", [ClientId]),
     {stop, normal, State};
     {stop, normal, State};
 
 
+handle_cast({resume, ClientId, ClientPid}, State) ->
+    lager:error("Cannot resume session ~p with pid ~p: ~p",
+                        [ClientId, ClientPid, State]),
+    {noreply, State};
+
+handle_cast({'DOWN', ClientPid, Reason}, State = #session_state{clientid = ClientId,
+                                                                client_pid = ClientPid}) ->
+    lager:error("Session: client ~s@~p is down for ~p", [ClientId, ClientPid, Reason]),
+    unlink(ClientPid),
+    {noreply, start_expire_timer(State#session_state{client_pid = undefined})};
+
 handle_cast(Msg, State) ->
 handle_cast(Msg, State) ->
-    {stop, {badmsg, Msg}, State}.
+    lager:critical("Unexpected Msg: ~p, State: ~p", [Msg, State]), 
+    {noreply, State}.
 
 
 handle_info({dispatch, {_From, Messages}}, State) when is_list(Messages) ->
 handle_info({dispatch, {_From, Messages}}, State) when is_list(Messages) ->
     F = fun(Message, S) -> dispatch(Message, S) end,
     F = fun(Message, S) -> dispatch(Message, S) end,
@@ -347,18 +371,21 @@ handle_info({dispatch, {_From, Message}}, State) ->
     {noreply, dispatch(Message, State)};
     {noreply, dispatch(Message, State)};
 
 
 handle_info({'EXIT', ClientPid, Reason}, State = #session_state{clientid = ClientId,
 handle_info({'EXIT', ClientPid, Reason}, State = #session_state{clientid = ClientId,
-                                                                client_pid = ClientPid,
-                                                                expires = Expires}) ->
-    lager:warning("Session: client ~s@~p exited, caused by ~p", [ClientId, ClientPid, Reason]),
-    Timer = erlang:send_after(Expires * 1000, self(), session_expired),
-    {noreply, State#session_state{client_pid = undefined, expire_timer = Timer}};
+                                                                client_pid = ClientPid}) ->
+    lager:error("Session: client ~s@~p exited, caused by ~p", [ClientId, ClientPid, Reason]),
+    {noreply, start_expire_timer(State#session_state{client_pid = undefined})};
+
+handle_info({'EXIT', ClientPid, _Reason}, State = #session_state{client_pid = OtherClientPid}) ->
+    lager:error("Unexpected Client EXIT: pid=~p, pid(state): ~p", [ClientPid, OtherClientPid]),
+    {noreply, State};
 
 
 handle_info(session_expired, State = #session_state{clientid = ClientId}) ->
 handle_info(session_expired, State = #session_state{clientid = ClientId}) ->
     lager:warning("Session ~s expired!", [ClientId]),
     lager:warning("Session ~s expired!", [ClientId]),
     {stop, {shutdown, expired}, State};
     {stop, {shutdown, expired}, State};
 
 
 handle_info(Info, State) ->
 handle_info(Info, State) ->
-    {stop, {badinfo, Info}, State}.
+    lager:critical("Unexpected Info: ~p, State: ~p", [Info, State]),
+    {noreply, State}.
 
 
 terminate(_Reason, _State) ->
 terminate(_Reason, _State) ->
     ok.
     ok.
@@ -393,4 +420,9 @@ next_msg_id(State = #session_state{message_id = 16#ffff}) ->
 next_msg_id(State = #session_state{message_id = MsgId}) ->
 next_msg_id(State = #session_state{message_id = MsgId}) ->
     State#session_state{message_id = MsgId + 1}.
     State#session_state{message_id = MsgId + 1}.
 
 
+start_expire_timer(State = #session_state{expires = Expires,
+                                          expire_timer = OldTimer}) ->
+    emqttd_utils:cancel_timer(OldTimer),
+    Timer = erlang:send_after(Expires * 1000, self(), session_expired),
+    State#session_state{expire_timer = Timer}.
 
 

+ 1 - 1
apps/emqttd/src/emqttd_session_sup.erl

@@ -48,7 +48,7 @@ start_session(ClientId, ClientPid) ->
 %%% Supervisor callbacks
 %%% Supervisor callbacks
 %%%=============================================================================
 %%%=============================================================================
 init([SessOpts]) ->
 init([SessOpts]) ->
-    {ok, {{simple_one_for_one, 0, 1},
+    {ok, {{simple_one_for_one, 10, 10},
           [{session, {emqttd_session, start_link, [SessOpts]},
           [{session, {emqttd_session, start_link, [SessOpts]},
               transient, 10000, worker, [emqttd_session]}]}}.
               transient, 10000, worker, [emqttd_session]}]}}.
 
 

+ 7 - 1
apps/emqttd/src/emqttd_utils.erl

@@ -27,7 +27,8 @@
 -module(emqttd_utils).
 -module(emqttd_utils).
 
 
 -export([apply_module_attributes/1,
 -export([apply_module_attributes/1,
-         all_module_attributes/1]).
+         all_module_attributes/1,
+         cancel_timer/1]).
 
 
 %% only {F, Args}...
 %% only {F, Args}...
 apply_module_attributes(Name) ->
 apply_module_attributes(Name) ->
@@ -76,3 +77,8 @@ ignore_lib_apps(Apps) ->
                hipe, esockd, mochiweb],
                hipe, esockd, mochiweb],
     [App || App = {Name, _, _} <- Apps, not lists:member(Name, LibApps)].
     [App || App = {Name, _, _} <- Apps, not lists:member(Name, LibApps)].
 
 
+
+cancel_timer(undefined) -> 
+	undefined;
+cancel_timer(Ref) -> 
+	catch erlang:cancel_timer(Ref).