Просмотр исходного кода

Remove emqx_session_sup to handle massive concurrent sessions

Feng Lee 7 лет назад
Родитель
Сommit
4aaf0a7db4
3 измененных файлов с 27 добавлено и 19 удалено
  1. 25 14
      src/emqx_session.erl
  2. 2 2
      src/emqx_sm.erl
  3. 0 3
      src/emqx_sup.erl

+ 25 - 14
src/emqx_session.erl

@@ -370,8 +370,8 @@ init([Parent, #{zone                := Zone,
                    topic_alias_maximum = TopicAliasMaximum,
                    will_msg            = WillMsg
                   },
-    emqx_sm:register_session(ClientId, attrs(State)),
-    emqx_sm:set_session_stats(ClientId, stats(State)),
+    ok = emqx_sm:register_session(ClientId, attrs(State)),
+    true = emqx_sm:set_session_stats(ClientId, stats(State)),
     emqx_hooks:run('session.created', [#{client_id => ClientId}, info(State)]),
     GcPolicy = emqx_zone:get_env(Zone, force_gc_policy, false),
     ok = emqx_gc:init(GcPolicy),
@@ -617,14 +617,20 @@ handle_info({timeout, Timer, emit_stats},
             ?LOG(warning, "shutdown due to ~p", [Reason], NewState),
             shutdown(Reason, NewState)
     end;
+
 handle_info({timeout, Timer, expired}, State = #state{expiry_timer = Timer}) ->
-    ?LOG(info, "expired, shutdown now:(", [], State),
+    ?LOG(info, "expired, shutdown now.", [], State),
     shutdown(expired, State);
 
 handle_info({timeout, Timer, will_delay}, State = #state{will_msg = WillMsg, will_delay_timer = Timer}) ->
     send_willmsg(WillMsg),
     {noreply, State#state{will_msg = undefined}};
 
+%% ConnPid is shutting down by the supervisor.
+handle_info({'EXIT', ConnPid, Reason}, #state{conn_pid = ConnPid})
+    when Reason =:= killed; Reason =:= shutdown ->
+    exit(Reason);
+
 handle_info({'EXIT', ConnPid, Reason}, State = #state{will_msg = WillMsg, expiry_interval = 0, conn_pid = ConnPid}) ->
     send_willmsg(WillMsg),
     {stop, Reason, State#state{will_msg = undefined, conn_pid = undefined}};
@@ -641,30 +647,35 @@ handle_info({'EXIT', Pid, Reason}, State = #state{conn_pid = ConnPid}) ->
     ?LOG(error, "Unexpected EXIT: conn_pid=~p, exit_pid=~p, reason=~p",
          [ConnPid, Pid, Reason], State),
     {noreply, State};
+
 handle_info(Info, State) ->
     emqx_logger:error("[Session] unexpected info: ~p", [Info]),
     {noreply, State}.
 
-terminate(Reason, #state{will_msg = WillMsg, conn_pid = ConnPid}) ->
-    %% Should not run hooks here.
-    %% emqx_hooks:run('session.terminated', [#{client_id => ClientId}, Reason]),
-    %% Let it crash.
-    %% emqx_sm:unregister_session(ClientId),
+terminate(Reason, #state{will_msg = WillMsg,
+                         client_id = ClientId,
+                         conn_pid = ConnPid,
+                         old_conn_pid = OldConnPid}) ->
     send_willmsg(WillMsg),
-    %% Ensure to shutdown the connection
-    if
-        ConnPid == undefined -> ok;
-        true -> ConnPid ! {shutdown, Reason}
-    end.
+    [maybe_shutdown(Pid, Reason) || Pid <- [ConnPid, OldConnPid]],
+    emqx_hooks:run('session.terminated', [#{client_id => ClientId}, Reason]).
 
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
+maybe_shutdown(undefined, _Reason) ->
+    ok;
+maybe_shutdown(Pid, normal) ->
+     Pid ! {shutdown, normal};
+maybe_shutdown(Pid, Reason) ->
+    exit(Pid, Reason).
+
 %%------------------------------------------------------------------------------
 %% Internal functions
 %%------------------------------------------------------------------------------
 
-has_connection(#state{conn_pid = Pid}) -> is_pid(Pid) andalso is_process_alive(Pid).
+has_connection(#state{conn_pid = Pid}) ->
+    is_pid(Pid) andalso is_process_alive(Pid).
 
 handle_dispatch(Topic, Msg = #message{headers = Headers},
                 State = #state{subscriptions = SubMap,

+ 2 - 2
src/emqx_sm.erl

@@ -55,7 +55,7 @@ start_link() ->
 open_session(SessAttrs = #{clean_start := true, client_id := ClientId, conn_pid := ConnPid}) ->
     CleanStart = fun(_) ->
                      ok = discard_session(ClientId, ConnPid),
-                     emqx_session_sup:start_session(SessAttrs)
+                     emqx_session:start_link(SessAttrs)
                  end,
     emqx_sm_locker:trans(ClientId, CleanStart);
 
@@ -65,7 +65,7 @@ open_session(SessAttrs = #{clean_start := false, client_id := ClientId}) ->
                           {ok, SPid} ->
                               {ok, SPid, true};
                           {error, not_found} ->
-                              emqx_session_sup:start_session(SessAttrs)
+                              emqx_session:start_link(SessAttrs)
                       end
                   end,
     emqx_sm_locker:trans(ClientId, ResumeStart).

+ 0 - 3
src/emqx_sup.erl

@@ -69,8 +69,6 @@ init([]) ->
     AccessControl = worker_spec(emqx_access_control),
     %% Session Manager
     SMSup = supervisor_spec(emqx_sm_sup),
-    %% Session Sup
-    SessionSup = supervisor_spec(emqx_session_sup),
     %% Connection Manager
     CMSup = supervisor_spec(emqx_cm_sup),
     %% Sys Sup
@@ -83,7 +81,6 @@ init([]) ->
            BridgeSup,
            AccessControl,
            SMSup,
-           SessionSup,
            CMSup,
            SysSup]}}.