Просмотр исходного кода

Optimize connection and session management

Feng Lee 7 лет назад
Родитель
Сommit
8f2f4b6b81
7 измененных файлов с 78 добавлено и 71 удалено
  1. 16 8
      src/emqx_cm.erl
  2. 6 7
      src/emqx_protocol.erl
  3. 7 8
      src/emqx_session.erl
  4. 1 1
      src/emqx_session_sup.erl
  5. 45 43
      src/emqx_sm.erl
  6. 1 1
      src/emqx_sm_locker.erl
  7. 2 3
      src/emqx_sm_registry.erl

+ 16 - 8
src/emqx_cm.erl

@@ -36,7 +36,7 @@
 -define(CM, ?MODULE).
 -define(CM, ?MODULE).
 
 
 %% ETS Tables.
 %% ETS Tables.
--define(CONN_TAB,       emqx_conn).
+-define(CONN_TAB, emqx_conn).
 -define(CONN_ATTRS_TAB, emqx_conn_attrs).
 -define(CONN_ATTRS_TAB, emqx_conn_attrs).
 -define(CONN_STATS_TAB, emqx_conn_stats).
 -define(CONN_STATS_TAB, emqx_conn_stats).
 
 
@@ -56,7 +56,7 @@ register_connection(ClientId) when is_binary(ClientId) ->
     register_connection({ClientId, self()});
     register_connection({ClientId, self()});
 
 
 register_connection(Conn = {ClientId, ConnPid}) when is_binary(ClientId), is_pid(ConnPid) ->
 register_connection(Conn = {ClientId, ConnPid}) when is_binary(ClientId), is_pid(ConnPid) ->
-    _ = ets:insert(?CONN_TAB, Conn),
+    true = ets:insert(?CONN_TAB, Conn),
     notify({registered, ClientId, ConnPid}).
     notify({registered, ClientId, ConnPid}).
 
 
 -spec(register_connection(emqx_types:client_id() | {emqx_types:client_id(), pid()}, list()) -> ok).
 -spec(register_connection(emqx_types:client_id() | {emqx_types:client_id(), pid()}, list()) -> ok).
@@ -87,10 +87,13 @@ unregister_connection(ClientId) when is_binary(ClientId) ->
     unregister_connection({ClientId, self()});
     unregister_connection({ClientId, self()});
 
 
 unregister_connection(Conn = {ClientId, ConnPid}) when is_binary(ClientId), is_pid(ConnPid) ->
 unregister_connection(Conn = {ClientId, ConnPid}) when is_binary(ClientId), is_pid(ConnPid) ->
-    _ = ets:delete(?CONN_STATS_TAB, Conn),
-    _ = ets:delete(?CONN_ATTRS_TAB, Conn),
-    _ = ets:delete_object(?CONN_TAB, Conn),
-    notify({unregistered, ClientId, ConnPid}).
+    do_unregister_connection(Conn),
+    notify({unregistered, ConnPid}).
+
+do_unregister_connection(Conn) ->
+    true = ets:delete(?CONN_STATS_TAB, Conn),
+    true = ets:delete(?CONN_ATTRS_TAB, Conn),
+    true = ets:delete_object(?CONN_TAB, Conn).
 
 
 %% @doc Lookup connection pid
 %% @doc Lookup connection pid
 -spec(lookup_conn_pid(emqx_types:client_id()) -> pid() | undefined).
 -spec(lookup_conn_pid(emqx_types:client_id()) -> pid() | undefined).
@@ -138,7 +141,7 @@ handle_call(Req, _From, State) ->
 handle_cast({notify, {registered, ClientId, ConnPid}}, State = #{conn_pmon := PMon}) ->
 handle_cast({notify, {registered, ClientId, ConnPid}}, State = #{conn_pmon := PMon}) ->
     {noreply, State#{conn_pmon := emqx_pmon:monitor(ConnPid, ClientId, PMon)}};
     {noreply, State#{conn_pmon := emqx_pmon:monitor(ConnPid, ClientId, PMon)}};
 
 
-handle_cast({notify, {unregistered, _ClientId, ConnPid}}, State = #{conn_pmon := PMon}) ->
+handle_cast({notify, {unregistered, ConnPid}}, State = #{conn_pmon := PMon}) ->
     {noreply, State#{conn_pmon := emqx_pmon:demonitor(ConnPid, PMon)}};
     {noreply, State#{conn_pmon := emqx_pmon:demonitor(ConnPid, PMon)}};
 
 
 handle_cast(Msg, State) ->
 handle_cast(Msg, State) ->
@@ -150,7 +153,12 @@ handle_info({'DOWN', _MRef, process, ConnPid, _Reason}, State = #{conn_pmon := P
         undefined ->
         undefined ->
             {noreply, State};
             {noreply, State};
         ClientId ->
         ClientId ->
-            unregister_connection({ClientId, ConnPid}),
+            Conn = {ClientId, ConnPid},
+            case ets:member(?CONN_ATTRS_TAB, Conn) of
+                true ->
+                    ok = emqx_pool:async_submit(fun do_unregister_connection/1, [Conn]);
+                false -> ok
+            end,
             {noreply, State#{conn_pmon := emqx_pmon:erase(ConnPid, PMon)}}
             {noreply, State#{conn_pmon := emqx_pmon:erase(ConnPid, PMon)}}
     end;
     end;
 
 

+ 6 - 7
src/emqx_protocol.erl

@@ -849,14 +849,13 @@ shutdown(_Reason, #pstate{client_id = undefined}) ->
     ok;
     ok;
 shutdown(_Reason, #pstate{connected = false}) ->
 shutdown(_Reason, #pstate{connected = false}) ->
     ok;
     ok;
-shutdown(Reason, #pstate{client_id = ClientId}) when Reason =:= conflict;
-                                                     Reason =:= discard ->
-    emqx_cm:unregister_connection(ClientId);
-shutdown(Reason, PState = #pstate{connected = true,
-                                  client_id = ClientId}) ->
+shutdown(conflict, _PState) ->
+    ok;
+shutdown(discard, _PState) ->
+    ok;
+shutdown(Reason, PState) ->
     ?LOG(info, "Shutdown for ~p", [Reason]),
     ?LOG(info, "Shutdown for ~p", [Reason]),
-    emqx_hooks:run('client.disconnected', [credentials(PState), Reason]),
-    emqx_cm:unregister_connection(ClientId).
+    emqx_hooks:run('client.disconnected', [credentials(PState), Reason]).
 
 
 start_keepalive(0, _PState) ->
 start_keepalive(0, _PState) ->
     ignore;
     ignore;

+ 7 - 8
src/emqx_session.erl

@@ -645,16 +645,14 @@ handle_info(Info, State) ->
     emqx_logger:error("[Session] unexpected info: ~p", [Info]),
     emqx_logger:error("[Session] unexpected info: ~p", [Info]),
     {noreply, State}.
     {noreply, State}.
 
 
-terminate(Reason, #state{will_msg = WillMsg, client_id = ClientId, conn_pid = ConnPid}) ->
-    emqx_hooks:run('session.terminated', [#{client_id => ClientId}, Reason]),
+terminate(Reason, #state{will_msg = WillMsg, conn_pid = ConnPid}) ->
+    %% Should not run hooks here.
+    %% emqx_hooks:run('session.terminated', [#{client_id => ClientId}, Reason]),
     send_willmsg(WillMsg),
     send_willmsg(WillMsg),
     %% Ensure to shutdown the connection
     %% Ensure to shutdown the connection
-    if
-        ConnPid =/= undefined ->
-            ConnPid ! {shutdown, Reason};
-        true -> ok
-    end,
-    emqx_sm:unregister_session(ClientId).
+    (ConnPid =:= undefined) orelse ConnPid ! {shutdown, Reason}.
+    %% Let it crash.
+    %% emqx_sm:unregister_session(ClientId).
 
 
 code_change(_OldVsn, State, _Extra) ->
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
     {ok, State}.
@@ -1011,3 +1009,4 @@ noreply(State) ->
 
 
 shutdown(Reason, State) ->
 shutdown(Reason, State) ->
     {stop, {shutdown, Reason}, State}.
     {stop, {shutdown, Reason}, State}.
+

+ 1 - 1
src/emqx_session_sup.erl

@@ -38,7 +38,7 @@ init([]) ->
           [#{id       => session,
           [#{id       => session,
              start    => {emqx_session, start_link, []},
              start    => {emqx_session, start_link, []},
              restart  => temporary,
              restart  => temporary,
-             shutdown => 5000,
+             shutdown => brutal_kill,
              type     => worker,
              type     => worker,
              modules  => [emqx_session]}]}}.
              modules  => [emqx_session]}]}}.
 
 

+ 45 - 43
src/emqx_sm.erl

@@ -40,9 +40,9 @@
 
 
 -define(SM, ?MODULE).
 -define(SM, ?MODULE).
 
 
-%% ETS Tables
--define(SESSION_TAB,       emqx_session).
--define(SESSION_P_TAB,     emqx_persistent_session).
+%% ETS Tables for session management.
+-define(SESSION_TAB, emqx_session).
+-define(SESSION_P_TAB, emqx_session_p).
 -define(SESSION_ATTRS_TAB, emqx_session_attrs).
 -define(SESSION_ATTRS_TAB, emqx_session_attrs).
 -define(SESSION_STATS_TAB, emqx_session_stats).
 -define(SESSION_STATS_TAB, emqx_session_stats).
 
 
@@ -59,8 +59,7 @@ open_session(SessAttrs = #{clean_start := true, client_id := ClientId, conn_pid
                  end,
                  end,
     emqx_sm_locker:trans(ClientId, CleanStart);
     emqx_sm_locker:trans(ClientId, CleanStart);
 
 
-open_session(SessAttrs = #{clean_start := false,
-                           client_id   := ClientId}) ->
+open_session(SessAttrs = #{clean_start := false, client_id := ClientId}) ->
     ResumeStart = fun(_) ->
     ResumeStart = fun(_) ->
                       case resume_session(ClientId, SessAttrs) of
                       case resume_session(ClientId, SessAttrs) of
                           {ok, SPid} ->
                           {ok, SPid} ->
@@ -77,13 +76,14 @@ discard_session(ClientId) when is_binary(ClientId) ->
     discard_session(ClientId, self()).
     discard_session(ClientId, self()).
 
 
 discard_session(ClientId, ConnPid) when is_binary(ClientId) ->
 discard_session(ClientId, ConnPid) when is_binary(ClientId) ->
-    lists:foreach(fun({_ClientId, SPid}) ->
-                      case catch emqx_session:discard(SPid, ConnPid) of
-                          {Err, Reason} when Err =:= 'EXIT'; Err =:= error ->
-                              emqx_logger:error("[SM] Failed to discard ~p: ~p", [SPid, Reason]);
-                          ok -> ok
-                      end
-                  end, lookup_session(ClientId)).
+    lists:foreach(
+      fun({_ClientId, SPid}) ->
+              case catch emqx_session:discard(SPid, ConnPid) of
+                  {Err, Reason} when Err =:= 'EXIT'; Err =:= error ->
+                      emqx_logger:error("[SM] Failed to discard ~p: ~p", [SPid, Reason]);
+                  ok -> ok
+              end
+      end, lookup_session(ClientId)).
 
 
 %% @doc Try to resume a session.
 %% @doc Try to resume a session.
 -spec(resume_session(emqx_types:client_id(), map()) -> {ok, pid()} | {error, term()}).
 -spec(resume_session(emqx_types:client_id(), map()) -> {ok, pid()} | {error, term()}).
@@ -116,19 +116,18 @@ close_session(SPid) when is_pid(SPid) ->
 register_session(ClientId, SessAttrs) when is_binary(ClientId) ->
 register_session(ClientId, SessAttrs) when is_binary(ClientId) ->
     register_session({ClientId, self()}, SessAttrs);
     register_session({ClientId, self()}, SessAttrs);
 
 
-register_session(Session = {ClientId, SPid}, SessAttrs)
-    when is_binary(ClientId), is_pid(SPid) ->
-    ets:insert(?SESSION_TAB, Session),
-    ets:insert(?SESSION_ATTRS_TAB, {Session, SessAttrs}),
-    proplists:get_value(clean_start, SessAttrs, true)
-        andalso ets:insert(?SESSION_P_TAB, Session),
-    emqx_sm_registry:register_session(Session),
+register_session(Session = {ClientId, SPid}, SessAttrs) when is_binary(ClientId), is_pid(SPid) ->
+    true = ets:insert(?SESSION_TAB, Session),
+    true = ets:insert(?SESSION_ATTRS_TAB, {Session, SessAttrs}),
+    true = proplists:get_value(clean_start, SessAttrs, true)
+            orelse ets:insert(?SESSION_P_TAB, Session),
+    ok = emqx_sm_registry:register_session(Session),
     notify({registered, ClientId, SPid}).
     notify({registered, ClientId, SPid}).
 
 
 %% @doc Get session attrs
 %% @doc Get session attrs
 -spec(get_session_attrs({emqx_types:client_id(), pid()}) -> list(emqx_session:attr())).
 -spec(get_session_attrs({emqx_types:client_id(), pid()}) -> list(emqx_session:attr())).
 get_session_attrs(Session = {ClientId, SPid}) when is_binary(ClientId), is_pid(SPid) ->
 get_session_attrs(Session = {ClientId, SPid}) when is_binary(ClientId), is_pid(SPid) ->
-    safe_lookup_element(?SESSION_ATTRS_TAB, Session, []).
+    emqx_tables:lookup_value(?SESSION_ATTRS_TAB, Session, []).
 
 
 %% @doc Set session attrs
 %% @doc Set session attrs
 -spec(set_session_attrs(emqx_types:client_id() | {emqx_types:client_id(), pid()},
 -spec(set_session_attrs(emqx_types:client_id() | {emqx_types:client_id(), pid()},
@@ -144,17 +143,21 @@ unregister_session(ClientId) when is_binary(ClientId) ->
     unregister_session({ClientId, self()});
     unregister_session({ClientId, self()});
 
 
 unregister_session(Session = {ClientId, SPid}) when is_binary(ClientId), is_pid(SPid) ->
 unregister_session(Session = {ClientId, SPid}) when is_binary(ClientId), is_pid(SPid) ->
-    emqx_sm_registry:unregister_session(Session),
-    ets:delete(?SESSION_STATS_TAB, Session),
-    ets:delete(?SESSION_ATTRS_TAB, Session),
-    ets:delete_object(?SESSION_P_TAB, Session),
-    ets:delete_object(?SESSION_TAB, Session),
+    ok = do_unregister_session(Session),
     notify({unregistered, ClientId, SPid}).
     notify({unregistered, ClientId, SPid}).
 
 
+%% @private
+do_unregister_session(Session) ->
+    true = ets:delete(?SESSION_STATS_TAB, Session),
+    true = ets:delete(?SESSION_ATTRS_TAB, Session),
+    true = ets:delete_object(?SESSION_P_TAB, Session),
+    true = ets:delete_object(?SESSION_TAB, Session),
+    emqx_sm_registry:unregister_session(Session).
+
 %% @doc Get session stats
 %% @doc Get session stats
 -spec(get_session_stats({emqx_types:client_id(), pid()}) -> list(emqx_stats:stats())).
 -spec(get_session_stats({emqx_types:client_id(), pid()}) -> list(emqx_stats:stats())).
 get_session_stats(Session = {ClientId, SPid}) when is_binary(ClientId), is_pid(SPid) ->
 get_session_stats(Session = {ClientId, SPid}) when is_binary(ClientId), is_pid(SPid) ->
-    safe_lookup_element(?SESSION_STATS_TAB, Session, []).
+    emqx_tables:lookup_value(?SESSION_STATS_TAB, Session, []).
 
 
 %% @doc Set session stats
 %% @doc Set session stats
 -spec(set_session_stats(emqx_types:client_id() | {emqx_types:client_id(), pid()},
 -spec(set_session_stats(emqx_types:client_id() | {emqx_types:client_id(), pid()},
@@ -168,7 +171,7 @@ set_session_stats(Session = {ClientId, SPid}, Stats) when is_binary(ClientId), i
 -spec(lookup_session(emqx_types:client_id()) -> list({emqx_types:client_id(), pid()})).
 -spec(lookup_session(emqx_types:client_id()) -> list({emqx_types:client_id(), pid()})).
 lookup_session(ClientId) ->
 lookup_session(ClientId) ->
     case emqx_sm_registry:is_enabled() of
     case emqx_sm_registry:is_enabled() of
-        true  -> emqx_sm_registry:lookup_session(ClientId);
+        true -> emqx_sm_registry:lookup_session(ClientId);
         false -> ets:lookup(?SESSION_TAB, ClientId)
         false -> ets:lookup(?SESSION_TAB, ClientId)
     end.
     end.
 
 
@@ -185,13 +188,7 @@ dispatch(ClientId, Topic, Msg) ->
 %% @doc Lookup session pid.
 %% @doc Lookup session pid.
 -spec(lookup_session_pid(emqx_types:client_id()) -> pid() | undefined).
 -spec(lookup_session_pid(emqx_types:client_id()) -> pid() | undefined).
 lookup_session_pid(ClientId) ->
 lookup_session_pid(ClientId) ->
-    safe_lookup_element(?SESSION_TAB, ClientId, undefined).
-
-safe_lookup_element(Tab, Key, Default) ->
-    try ets:lookup_element(Tab, Key, 2)
-    catch
-        error:badarg -> Default
-    end.
+    emqx_tables:lookup_value(?SESSION_TAB, ClientId).
 
 
 notify(Event) ->
 notify(Event) ->
     gen_server:cast(?SM, {notify, Event}).
     gen_server:cast(?SM, {notify, Event}).
@@ -207,29 +204,34 @@ init([]) ->
     ok = emqx_tables:new(?SESSION_ATTRS_TAB, TabOpts),
     ok = emqx_tables:new(?SESSION_ATTRS_TAB, TabOpts),
     ok = emqx_tables:new(?SESSION_STATS_TAB, TabOpts),
     ok = emqx_tables:new(?SESSION_STATS_TAB, TabOpts),
     ok = emqx_stats:update_interval(sm_stats, fun ?MODULE:stats_fun/0),
     ok = emqx_stats:update_interval(sm_stats, fun ?MODULE:stats_fun/0),
-    {ok, #{session_pmon => emqx_pmon:new()}}.
+    {ok, #{sess_pmon => emqx_pmon:new()}}.
 
 
 handle_call(Req, _From, State) ->
 handle_call(Req, _From, State) ->
     emqx_logger:error("[SM] unexpected call: ~p", [Req]),
     emqx_logger:error("[SM] unexpected call: ~p", [Req]),
     {reply, ignored, State}.
     {reply, ignored, State}.
 
 
-handle_cast({notify, {registered, ClientId, SPid}}, State = #{session_pmon := PMon}) ->
-    {noreply, State#{session_pmon := emqx_pmon:monitor(SPid, ClientId, PMon)}};
+handle_cast({notify, {registered, ClientId, SPid}}, State = #{sess_pmon := PMon}) ->
+    {noreply, State#{sess_pmon := emqx_pmon:monitor(SPid, ClientId, PMon)}};
 
 
-handle_cast({notify, {unregistered, _ClientId, SPid}}, State = #{session_pmon := PMon}) ->
-    {noreply, State#{session_pmon := emqx_pmon:demonitor(SPid, PMon)}};
+handle_cast({notify, {unregistered, _ClientId, SPid}}, State = #{sess_pmon := PMon}) ->
+    {noreply, State#{sess_pmon := emqx_pmon:demonitor(SPid, PMon)}};
 
 
 handle_cast(Msg, State) ->
 handle_cast(Msg, State) ->
     emqx_logger:error("[SM] unexpected cast: ~p", [Msg]),
     emqx_logger:error("[SM] unexpected cast: ~p", [Msg]),
     {noreply, State}.
     {noreply, State}.
 
 
-handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State = #{session_pmon := PMon}) ->
+handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State = #{sess_pmon := PMon}) ->
     case emqx_pmon:find(DownPid, PMon) of
     case emqx_pmon:find(DownPid, PMon) of
         undefined ->
         undefined ->
             {noreply, State};
             {noreply, State};
-        ClientId  ->
-            unregister_session({ClientId, DownPid}),
-            {noreply, State#{session_pmon := emqx_pmon:erase(DownPid, PMon)}}
+        ClientId ->
+            Session = {ClientId, DownPid},
+            case ets:member(?SESSION_ATTRS_TAB, Session) of
+                true ->
+                    ok = emqx_pool:async_submit(fun do_unregister_session/1, [Session]);
+                false -> ok
+            end,
+            {noreply, State#{sess_pmon := emqx_pmon:erase(DownPid, PMon)}}
     end;
     end;
 
 
 handle_info(Info, State) ->
 handle_info(Info, State) ->

+ 1 - 1
src/emqx_sm_locker.erl

@@ -21,7 +21,7 @@
 -export([trans/2, trans/3]).
 -export([trans/2, trans/3]).
 -export([lock/1, lock/2, unlock/1]).
 -export([lock/1, lock/2, unlock/1]).
 
 
--spec(start_link() -> {ok, pid()} | ignore | {error, term()}).
+-spec(start_link() -> emqx_types:startlink_ret()).
 start_link() ->
 start_link() ->
     ekka_locker:start_link(?MODULE).
     ekka_locker:start_link(?MODULE).
 
 

+ 2 - 3
src/emqx_sm_registry.erl

@@ -41,8 +41,7 @@ start_link() ->
     gen_server:start_link({local, ?REGISTRY}, ?MODULE, [], []).
     gen_server:start_link({local, ?REGISTRY}, ?MODULE, [], []).
 
 
 -spec(is_enabled() -> boolean()).
 -spec(is_enabled() -> boolean()).
-is_enabled() ->
-    ets:info(?TAB, name) =/= undefined.
+is_enabled() -> ets:info(?TAB, name) =/= undefined.
 
 
 -spec(lookup_session(emqx_types:client_id())
 -spec(lookup_session(emqx_types:client_id())
       -> list({emqx_types:client_id(), session_pid()})).
       -> list({emqx_types:client_id(), session_pid()})).
@@ -73,7 +72,7 @@ init([]) ->
                 {storage_properties, [{ets, [{read_concurrency, true},
                 {storage_properties, [{ets, [{read_concurrency, true},
                                              {write_concurrency, true}]}]}]),
                                              {write_concurrency, true}]}]}]),
     ok = ekka_mnesia:copy_table(?TAB),
     ok = ekka_mnesia:copy_table(?TAB),
-    _ = ekka:monitor(membership),
+    ok = ekka:monitor(membership),
     {ok, #{}}.
     {ok, #{}}.
 
 
 handle_call(Req, _From, State) ->
 handle_call(Req, _From, State) ->