Преглед изворни кода

Implement a new session supervisor. (#2077)

Feng Lee пре 7 година
родитељ
комит
892d9439b9
3 измењених фајлова са 297 додато и 45 уклоњено
  1. 256 0
      src/emqx_session_sup.erl
  2. 10 29
      src/emqx_sm.erl
  3. 31 16
      src/emqx_sm_sup.erl

+ 256 - 0
src/emqx_session_sup.erl

@@ -0,0 +1,256 @@
+%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+
+-module(emqx_session_sup).
+
+-behaviour(gen_server).
+
+-export([start_link/1]).
+-export([start_session/1, count_sessions/0]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+         code_change/3]).
+
+-type(shutdown() :: brutal_kill | infinity | pos_integer()).
+
+-record(state, {
+          sessions :: #{pid() => emqx_types:client_id()},
+          mfargs :: mfa(),
+          shutdown :: shutdown(),
+          clean_down :: fun()
+         }).
+
+-define(SUP, ?MODULE).
+-define(BATCH_EXIT, 100000).
+-define(ERROR_MSG(Format, Args),
+        error_logger:error_msg("[~s] " ++ Format, [?MODULE | Args])).
+
+%% @doc Start session supervisor.
+-spec(start_link(map()) -> emqx_types:startlink_ret()).
+start_link(SessSpec) when is_map(SessSpec) ->
+    gen_server:start_link({local, ?SUP}, ?MODULE, [SessSpec], []).
+
+%%------------------------------------------------------------------------------
+%% API
+%%------------------------------------------------------------------------------
+
+%% @doc Start a session.
+-spec(start_session(map()) -> emqx_types:startlink_ret()).
+start_session(SessAttrs) ->
+    gen_server:call(?SUP, {start_session, SessAttrs}, infinity).
+
+%% @doc Count sessions.
+-spec(count_sessions() -> non_neg_integer()).
+count_sessions() ->
+    gen_server:call(?SUP, count_sessions, infinity).
+
+%%------------------------------------------------------------------------------
+%% gen_server callbacks
+%%------------------------------------------------------------------------------
+
+init([Spec]) ->
+    process_flag(trap_exit, true),
+    MFA = maps:get(start, Spec),
+    Shutdown = maps:get(shutdown, Spec, brutal_kill),
+    CleanDown = maps:get(clean_down, Spec, undefined),
+    State = #state{sessions = #{},
+                   mfargs = MFA,
+                   shutdown = Shutdown,
+                   clean_down = CleanDown
+                  },
+    {ok, State}.
+
+handle_call({start_session, SessAttrs = #{client_id := ClientId}}, _From,
+            State = #state{sessions = SessMap, mfargs = {M, F, Args}}) ->
+    try erlang:apply(M, F, [SessAttrs | Args]) of
+        {ok, Pid} ->
+            reply({ok, Pid}, State#state{sessions = maps:put(Pid, ClientId, SessMap)});
+        ignore ->
+            reply(ignore, State);
+        {error, Reason} ->
+            reply({error, Reason}, State)
+    catch
+        _:Error:Stk ->
+            ?ERROR_MSG("Failed to start session ~p: ~p, stacktrace:~n~p",
+                       [ClientId, Error, Stk]),
+            reply({error, Error}, State)
+    end;
+
+handle_call(count_sessions, _From, State = #state{sessions = SessMap}) ->
+    {reply, maps:size(SessMap), State};
+
+handle_call(Req, _From, State) ->
+    ?ERROR_MSG("unexpected call: ~p", [Req]),
+    {reply, ignored, State}.
+
+handle_cast(Msg, State) ->
+    ?ERROR_MSG("unexpected cast: ~p", [Msg]),
+    {noreply, State}.
+
+handle_info({'EXIT', Pid, _Reason}, State = #state{sessions = SessMap, clean_down = CleanDown}) ->
+    SessPids = [Pid | drain_exit(?BATCH_EXIT, [])],
+    {SessItems, SessMap1} = erase_all(SessPids, SessMap),
+    (CleanDown =:= undefined)
+        orelse emqx_pool:async_submit(
+                 fun lists:foreach/2, [CleanDown, SessItems]),
+    {noreply, State#state{sessions = SessMap1}};
+
+handle_info(Info, State) ->
+    ?ERROR_MSG("unexpected info: ~p", [Info]),
+    {noreply, State}.
+
+terminate(_Reason, State) ->
+    terminate_children(State).
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+%%------------------------------------------------------------------------------
+%% Internal functions
+%%------------------------------------------------------------------------------
+
+drain_exit(0, Acc) ->
+    lists:reverse(Acc);
+drain_exit(Cnt, Acc) ->
+    receive
+        {'EXIT', Pid, _Reason} ->
+            drain_exit(Cnt - 1, [Pid|Acc])
+    after 0 ->
+          lists:reverse(Acc)
+    end.
+
+erase_all(Pids, Map) ->
+    lists:foldl(
+      fun(Pid, {Acc, M}) ->
+            case maps:take(Pid, M) of
+                {Val, M1} ->
+                    {[{Val, Pid}|Acc], M1};
+                error ->
+                    {Acc, M}
+            end
+      end, {[], Map}, Pids).
+
+terminate_children(State = #state{sessions = SessMap, shutdown = Shutdown}) ->
+    {Pids, EStack0} = monitor_children(SessMap),
+    Sz = sets:size(Pids),
+    EStack =
+    case Shutdown of
+        brutal_kill ->
+            sets:fold(fun(P, _) -> exit(P, kill) end, ok, Pids),
+            wait_children(Shutdown, Pids, Sz, undefined, EStack0);
+        infinity ->
+            sets:fold(fun(P, _) -> exit(P, shutdown) end, ok, Pids),
+            wait_children(Shutdown, Pids, Sz, undefined, EStack0);
+        Time when is_integer(Time) ->
+            sets:fold(fun(P, _) -> exit(P, shutdown) end, ok, Pids),
+            TRef = erlang:start_timer(Time, self(), kill),
+            wait_children(Shutdown, Pids, Sz, TRef, EStack0)
+    end,
+    %% Unroll stacked errors and report them
+    dict:fold(fun(Reason, Pid, _) ->
+                  report_error(connection_shutdown_error, Reason, Pid, State)
+              end, ok, EStack).
+
+monitor_children(SessMap) ->
+    lists:foldl(
+      fun(Pid, {Pids, EStack}) ->
+          case monitor_child(Pid) of
+              ok ->
+                  {sets:add_element(Pid, Pids), EStack};
+              {error, normal} ->
+                  {Pids, EStack};
+              {error, Reason} ->
+                  {Pids, dict:append(Reason, Pid, EStack)}
+          end
+      end, {sets:new(), dict:new()}, maps:keys(SessMap)).
+
+%% Help function to shutdown/2 switches from link to monitor approach
+monitor_child(Pid) ->
+    %% Do the monitor operation first so that if the child dies
+    %% before the monitoring is done causing a 'DOWN'-message with
+    %% reason noproc, we will get the real reason in the 'EXIT'-message
+    %% unless a naughty child has already done unlink...
+    erlang:monitor(process, Pid),
+    unlink(Pid),
+
+    receive
+	%% If the child dies before the unlik we must empty
+	%% the mail-box of the 'EXIT'-message and the 'DOWN'-message.
+	{'EXIT', Pid, Reason} ->
+	    receive
+		{'DOWN', _, process, Pid, _} ->
+		    {error, Reason}
+	    end
+    after 0 ->
+	    %% If a naughty child did unlink and the child dies before
+	    %% monitor the result will be that shutdown/2 receives a
+	    %% 'DOWN'-message with reason noproc.
+	    %% If the child should die after the unlink there
+	    %% will be a 'DOWN'-message with a correct reason
+	    %% that will be handled in shutdown/2.
+	    ok
+    end.
+
+wait_children(_Shutdown, _Pids, 0, undefined, EStack) ->
+    EStack;
+wait_children(_Shutdown, _Pids, 0, TRef, EStack) ->
+	%% If the timer has expired before its cancellation, we must empty the
+	%% mail-box of the 'timeout'-message.
+    erlang:cancel_timer(TRef),
+    receive
+        {timeout, TRef, kill} ->
+            EStack
+    after 0 ->
+            EStack
+    end;
+
+%%TODO: Copied from supervisor.erl, rewrite it later.
+wait_children(brutal_kill, Pids, Sz, TRef, EStack) ->
+    receive
+        {'DOWN', _MRef, process, Pid, killed} ->
+            wait_children(brutal_kill, sets:del_element(Pid, Pids), Sz-1, TRef, EStack);
+
+        {'DOWN', _MRef, process, Pid, Reason} ->
+            wait_children(brutal_kill, sets:del_element(Pid, Pids),
+                          Sz-1, TRef, dict:append(Reason, Pid, EStack))
+    end;
+
+wait_children(Shutdown, Pids, Sz, TRef, EStack) ->
+    receive
+        {'DOWN', _MRef, process, Pid, shutdown} ->
+            wait_children(Shutdown, sets:del_element(Pid, Pids), Sz-1, TRef, EStack);
+        {'DOWN', _MRef, process, Pid, normal} ->
+            wait_children(Shutdown, sets:del_element(Pid, Pids), Sz-1, TRef, EStack);
+        {'DOWN', _MRef, process, Pid, Reason} ->
+            wait_children(Shutdown, sets:del_element(Pid, Pids), Sz-1,
+                          TRef, dict:append(Reason, Pid, EStack));
+        {timeout, TRef, kill} ->
+            sets:fold(fun(P, _) -> exit(P, kill) end, ok, Pids),
+            wait_children(Shutdown, Pids, Sz-1, undefined, EStack)
+    end.
+
+report_error(Error, Reason, Pid, #state{mfargs = MFA}) ->
+    SupName  = list_to_atom("esockd_connection_sup - " ++ pid_to_list(self())),
+    ErrorMsg = [{supervisor, SupName},
+                {errorContext, Error},
+                {reason, Reason},
+                {offender, [{pid, Pid},
+                            {name, connection},
+                            {mfargs, MFA}]}],
+    error_logger:error_report(supervisor_report, ErrorMsg).
+
+reply(Repy, State) ->
+    {reply, Repy, State}.
+

+ 10 - 29
src/emqx_sm.erl

@@ -37,6 +37,9 @@
 %% Internal function for stats
 %% Internal function for stats
 -export([stats_fun/0]).
 -export([stats_fun/0]).
 
 
+%% Internal function for emqx_session_sup
+-export([clean_down/1]).
+
 %% gen_server callbacks
 %% gen_server callbacks
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
          code_change/3]).
          code_change/3]).
@@ -60,7 +63,7 @@ start_link() ->
 open_session(SessAttrs = #{clean_start := true, client_id := ClientId, conn_pid := ConnPid}) ->
 open_session(SessAttrs = #{clean_start := true, client_id := ClientId, conn_pid := ConnPid}) ->
     CleanStart = fun(_) ->
     CleanStart = fun(_) ->
                      ok = discard_session(ClientId, ConnPid),
                      ok = discard_session(ClientId, ConnPid),
-                     emqx_session:start_link(SessAttrs)
+                     emqx_session_sup:start_session(SessAttrs)
                  end,
                  end,
     emqx_sm_locker:trans(ClientId, CleanStart);
     emqx_sm_locker:trans(ClientId, CleanStart);
 
 
@@ -70,7 +73,7 @@ open_session(SessAttrs = #{clean_start := false, client_id := ClientId}) ->
                           {ok, SessPid} ->
                           {ok, SessPid} ->
                               {ok, SessPid, true};
                               {ok, SessPid, true};
                           {error, not_found} ->
                           {error, not_found} ->
-                              emqx_session:start_link(SessAttrs)
+                              emqx_session_sup:start_session(SessAttrs)
                       end
                       end
                   end,
                   end,
     emqx_sm_locker:trans(ClientId, ResumeStart).
     emqx_sm_locker:trans(ClientId, ResumeStart).
@@ -130,8 +133,7 @@ register_session(ClientId) when is_binary(ClientId) ->
 register_session(ClientId, SessPid) when is_binary(ClientId), is_pid(SessPid) ->
 register_session(ClientId, SessPid) when is_binary(ClientId), is_pid(SessPid) ->
     Session = {ClientId, SessPid},
     Session = {ClientId, SessPid},
     true = ets:insert(?SESSION_TAB, Session),
     true = ets:insert(?SESSION_TAB, Session),
-    ok = emqx_sm_registry:register_session(Session),
-    notify({registered, ClientId, SessPid}).
+    emqx_sm_registry:register_session(Session).
 
 
 %% @doc Unregister a session
 %% @doc Unregister a session
 -spec(unregister_session(emqx_types:client_id()) -> ok).
 -spec(unregister_session(emqx_types:client_id()) -> ok).
@@ -140,11 +142,7 @@ unregister_session(ClientId) when is_binary(ClientId) ->
 
 
 -spec(unregister_session(emqx_types:client_id(), pid()) -> ok).
 -spec(unregister_session(emqx_types:client_id(), pid()) -> ok).
 unregister_session(ClientId, SessPid) when is_binary(ClientId), is_pid(SessPid) ->
 unregister_session(ClientId, SessPid) when is_binary(ClientId), is_pid(SessPid) ->
-    ok = do_unregister_session({ClientId, SessPid}),
-    notify({unregistered, SessPid}).
-
-%% @private
-do_unregister_session(Session) ->
+    Session = {ClientId, SessPid},
     true = ets:delete(?SESSION_STATS_TAB, Session),
     true = ets:delete(?SESSION_STATS_TAB, Session),
     true = ets:delete(?SESSION_ATTRS_TAB, Session),
     true = ets:delete(?SESSION_ATTRS_TAB, Session),
     true = ets:delete_object(?SESSION_P_TAB, Session),
     true = ets:delete_object(?SESSION_P_TAB, Session),
@@ -214,9 +212,6 @@ dispatch(ClientId, Topic, Msg) ->
             emqx_hooks:run('message.dropped', [#{client_id => ClientId}, Msg])
             emqx_hooks:run('message.dropped', [#{client_id => ClientId}, Msg])
     end.
     end.
 
 
-notify(Event) ->
-    gen_server:cast(?SM, {notify, Event}).
-
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 %% gen_server callbacks
 %% gen_server callbacks
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
@@ -228,29 +223,16 @@ init([]) ->
     ok = emqx_tables:new(?SESSION_ATTRS_TAB, TabOpts),
     ok = emqx_tables:new(?SESSION_ATTRS_TAB, TabOpts),
     ok = emqx_tables:new(?SESSION_STATS_TAB, TabOpts),
     ok = emqx_tables:new(?SESSION_STATS_TAB, TabOpts),
     ok = emqx_stats:update_interval(sess_stats, fun ?MODULE:stats_fun/0),
     ok = emqx_stats:update_interval(sess_stats, fun ?MODULE:stats_fun/0),
-    {ok, #{sess_pmon => emqx_pmon:new()}}.
+    {ok, #{}}.
 
 
 handle_call(Req, _From, State) ->
 handle_call(Req, _From, State) ->
     emqx_logger:error("[SM] unexpected call: ~p", [Req]),
     emqx_logger:error("[SM] unexpected call: ~p", [Req]),
     {reply, ignored, State}.
     {reply, ignored, State}.
 
 
-handle_cast({notify, {registered, ClientId, SessPid}}, State = #{sess_pmon := PMon}) ->
-    {noreply, State#{sess_pmon := emqx_pmon:monitor(SessPid, ClientId, PMon)}};
-
-handle_cast({notify, {unregistered, SessPid}}, State = #{sess_pmon := PMon}) ->
-    {noreply, State#{sess_pmon := emqx_pmon:demonitor(SessPid, PMon)}};
-
 handle_cast(Msg, State) ->
 handle_cast(Msg, State) ->
     emqx_logger:error("[SM] unexpected cast: ~p", [Msg]),
     emqx_logger:error("[SM] unexpected cast: ~p", [Msg]),
     {noreply, State}.
     {noreply, State}.
 
 
-handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{sess_pmon := PMon}) ->
-    SessPids = [Pid | emqx_misc:drain_down(?BATCH_SIZE)],
-    {Items, PMon1} = emqx_pmon:erase_all(SessPids, PMon),
-    ok = emqx_pool:async_submit(
-           fun lists:foreach/2, [fun clean_down/1, Items]),
-    {noreply, State#{sess_pmon := PMon1}};
-
 handle_info(Info, State) ->
 handle_info(Info, State) ->
     emqx_logger:error("[SM] unexpected info: ~p", [Info]),
     emqx_logger:error("[SM] unexpected info: ~p", [Info]),
     {noreply, State}.
     {noreply, State}.
@@ -265,12 +247,11 @@ code_change(_OldVsn, State, _Extra) ->
 %% Internal functions
 %% Internal functions
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 
 
-clean_down({SessPid, ClientId}) ->
-    Session = {ClientId, SessPid},
+clean_down(Session = {ClientId, SessPid}) ->
     case ets:member(?SESSION_TAB, ClientId)
     case ets:member(?SESSION_TAB, ClientId)
          orelse ets:member(?SESSION_ATTRS_TAB, Session) of
          orelse ets:member(?SESSION_ATTRS_TAB, Session) of
         true ->
         true ->
-            do_unregister_session(Session);
+            unregister_session(ClientId, SessPid);
         false -> ok
         false -> ok
     end.
     end.
 
 

+ 31 - 16
src/emqx_sm_sup.erl

@@ -26,25 +26,40 @@ start_link() ->
 
 
 init([]) ->
 init([]) ->
     %% Session locker
     %% Session locker
-    Locker = #{id       => locker,
-               start    => {emqx_sm_locker, start_link, []},
-               restart  => permanent,
+    Locker = #{id => locker,
+               start => {emqx_sm_locker, start_link, []},
+               restart => permanent,
                shutdown => 5000,
                shutdown => 5000,
-               type     => worker,
-               modules  => [emqx_sm_locker]},
+               type => worker,
+               modules => [emqx_sm_locker]
+              },
     %% Session registry
     %% Session registry
-    Registry = #{id       => registry,
-                 start    => {emqx_sm_registry, start_link, []},
-                 restart  => permanent,
+    Registry = #{id => registry,
+                 start => {emqx_sm_registry, start_link, []},
+                 restart => permanent,
                  shutdown => 5000,
                  shutdown => 5000,
-                 type     => worker,
-                 modules  => [emqx_sm_registry]},
+                 type => worker,
+                 modules => [emqx_sm_registry]
+                },
     %% Session Manager
     %% Session Manager
-    Manager = #{id       => manager,
-                start    => {emqx_sm, start_link, []},
-                restart  => permanent,
+    Manager = #{id => manager,
+                start => {emqx_sm, start_link, []},
+                restart => permanent,
                 shutdown => 5000,
                 shutdown => 5000,
-                type     => worker,
-                modules  => [emqx_sm]},
-    {ok, {{rest_for_one, 10, 3600}, [Locker, Registry, Manager]}}.
+                type => worker,
+                modules => [emqx_sm]
+               },
+    %% Session Sup
+    SessSpec = #{start => {emqx_session, start_link, []},
+                 shutdown => brutal_kill,
+                 clean_down => fun emqx_sm:clean_down/1
+                },
+    SessionSup = #{id => session_sup,
+                   start => {emqx_session_sup, start_link, [SessSpec ]},
+                   restart => transient,
+                   shutdown => infinity,
+                   type => supervisor,
+                   modules => [emqx_session_sup]
+                  },
+    {ok, {{rest_for_one, 10, 3600}, [Locker, Registry, Manager, SessionSup]}}.