tigercl 7 лет назад
Родитель
Сommit
03f607c1b2
5 измененных файлов с 58 добавлено и 35 удалено
  1. 1 1
      priv/emqx.schema
  2. 32 9
      src/emqx_protocol.erl
  3. 18 18
      src/emqx_session.erl
  4. 6 6
      test/emqx_mock_client.erl
  5. 1 1
      test/emqx_sm_SUITE.erl

+ 1 - 1
priv/emqx.schema

@@ -797,7 +797,7 @@ end}.
 %% @doc Session Expiry Interval
 {mapping, "zone.$name.session_expiry_interval", "emqx.zones", [
   {default, "2h"},
-  {datatype, {duration, ms}}
+  {datatype, {duration, s}}
 ]}.
 
 %% @doc Type: simple | priority

+ 32 - 9
src/emqx_protocol.erl

@@ -410,9 +410,17 @@ process_packet(?UNSUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
 process_packet(?PACKET(?PINGREQ), PState) ->
     send(?PACKET(?PINGRESP), PState);
 
-process_packet(?DISCONNECT_PACKET(?RC_SUCCESS), PState) ->
-    %% Clean willmsg
-    {stop, normal, PState#pstate{will_msg = undefined}};
+process_packet(?DISCONNECT_PACKET(?RC_SUCCESS, #{'Session-Expiry-Interval' := Interval}), 
+                PState = #pstate{session = SPid, conn_props = #{'Session-Expiry-Interval' := OldInterval}}) ->
+    case Interval =/= 0 andalso OldInterval =:= 0 of
+        true -> 
+            deliver({disconnect, ?RC_PROTOCOL_ERROR}, PState),
+            {error, protocol_error, PState};
+        false -> 
+            emqx_session:update_expiry_interval(SPid, Interval),
+            %% Clean willmsg
+            {stop, normal, PState#pstate{will_msg = undefined}}
+    end;
 process_packet(?DISCONNECT_PACKET(_), PState) ->
     {stop, normal, PState}.
 
@@ -562,17 +570,32 @@ maybe_assign_client_id(PState) ->
     PState.
 
 try_open_session(#pstate{zone        = Zone,
+                         proto_ver   = ProtoVer,
                          client_id   = ClientId,
                          conn_pid    = ConnPid,
                          conn_props  = ConnProps,
                          username    = Username,
                          clean_start = CleanStart}) ->
-    case emqx_sm:open_session(#{zone        => Zone,
-                                client_id   => ClientId,
-                                conn_pid    => ConnPid,
-                                username    => Username,
-                                clean_start => CleanStart,
-                                conn_props  => ConnProps}) of
+
+    SessAttrs = #{
+        zone        => Zone,
+        client_id   => ClientId,
+        conn_pid    => ConnPid,
+        username    => Username,
+        clean_start => CleanStart
+    },
+
+    case emqx_sm:open_session(maps:put(expiry_interval, if 
+                                ProtoVer =:= ?MQTT_PROTO_V5 ->
+                                    maps:get('Session-Expiry-Interval', ConnProps, 0);
+                                true ->
+                                    case CleanStart of
+                                        true ->
+                                            0;
+                                        false ->
+                                            emqx_zone:get_env(Zone, session_expiry_interval, 16#ffffffff)
+                                    end  
+                            end, SessAttrs)) of
         {ok, SPid} ->
             {ok, SPid, false};
         Other -> Other

+ 18 - 18
src/emqx_session.erl

@@ -47,6 +47,7 @@
 -export([info/1, attrs/1]).
 -export([stats/1]).
 -export([resume/2, discard/2]).
+-export([update_expiry_interval/2]).
 -export([subscribe/2, subscribe/4]).
 -export([publish/3]).
 -export([puback/2, puback/3]).
@@ -313,6 +314,10 @@ resume(SPid, ConnPid) ->
 discard(SPid, ByPid) ->
     gen_server:call(SPid, {discard, ByPid}, infinity).
 
+-spec(update_expiry_interval(spid(), timeout()) -> ok).
+update_expiry_interval(SPid, Interval) ->
+    gen_server:cast(SPid, {expiry_interval, Interval * 1000}).
+
 -spec(close(spid()) -> ok).
 close(SPid) ->
     gen_server:call(SPid, close, infinity).
@@ -321,12 +326,12 @@ close(SPid) ->
 %% gen_server callbacks
 %%------------------------------------------------------------------------------
 
-init([Parent, #{zone        := Zone,
-                client_id   := ClientId,
-                username    := Username,
-                conn_pid    := ConnPid,
-                clean_start := CleanStart,
-                conn_props  := ConnProps}]) ->
+init([Parent, #{zone            := Zone,
+                client_id       := ClientId,
+                username        := Username,
+                conn_pid        := ConnPid,
+                clean_start     := CleanStart,
+                expiry_interval := ExpiryInterval}]) ->
     process_flag(trap_exit, true),
     true = link(ConnPid),
     MaxInflight = get_env(Zone, max_inflight),
@@ -346,7 +351,7 @@ init([Parent, #{zone        := Zone,
                    awaiting_rel      = #{},
                    await_rel_timeout = get_env(Zone, await_rel_timeout),
                    max_awaiting_rel  = get_env(Zone, max_awaiting_rel),
-                   expiry_interval   = expire_interval(Zone, ConnProps),
+                   expiry_interval   = ExpiryInterval,
                    enable_stats      = get_env(Zone, enable_stats, true),
                    deliver_stats     = 0,
                    enqueue_stats     = 0,
@@ -361,11 +366,6 @@ init([Parent, #{zone        := Zone,
     ok = proc_lib:init_ack(Parent, {ok, self()}),
     gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}], State).
 
-expire_interval(_Zone, #{'Session-Expiry-Interval' := I}) ->
-    I * 1000;
-expire_interval(Zone, _ConnProps) -> %% Maybe v3.1.1
-    get_env(Zone, session_expiry_interval, 0).
-
 init_mqueue(Zone) ->
     emqx_mqueue:init(#{type => get_env(Zone, mqueue_type, simple),
                        max_len => get_env(Zone, max_mqueue_len, 1000),
@@ -540,6 +540,9 @@ handle_cast({resume, ConnPid}, State = #state{client_id       = ClientId,
     %% Replay delivery and Dequeue pending messages
     noreply(dequeue(retry_delivery(true, State1)));
 
+handle_cast({expiry_interval, Interval}, State) ->
+    {noreply, State#state{expiry_interval = Interval}};
+
 handle_cast(Msg, State) ->
     emqx_logger:error("[Session] unexpected cast: ~p", [Msg]),
     {noreply, State}.
@@ -591,13 +594,10 @@ handle_info({timeout, Timer, expired}, State = #state{expiry_timer = Timer}) ->
     ?LOG(info, "expired, shutdown now:(", [], State),
     shutdown(expired, State);
 
-handle_info({'EXIT', ConnPid, Reason}, State = #state{clean_start = true, conn_pid = ConnPid}) ->
-    {stop, Reason, State#state{conn_pid = undefined}};
-
 handle_info({'EXIT', ConnPid, Reason}, State = #state{expiry_interval = 0, conn_pid = ConnPid}) ->
     {stop, Reason, State#state{conn_pid = undefined}};
 
-handle_info({'EXIT', ConnPid, _Reason}, State = #state{clean_start = false, conn_pid = ConnPid}) ->
+handle_info({'EXIT', ConnPid, _Reason}, State = #state{conn_pid = ConnPid}) ->
     {noreply, ensure_expire_timer(State#state{conn_pid = undefined})};
 
 handle_info({'EXIT', OldPid, _Reason}, State = #state{old_conn_pid = OldPid}) ->
@@ -876,8 +876,8 @@ ensure_retry_timer(Interval, State = #state{retry_timer = undefined}) ->
 ensure_retry_timer(_Timeout, State) ->
     State.
 
-ensure_expire_timer(State = #state{expiry_interval = Interval}) when Interval > 0 ->
-    State#state{expiry_timer = emqx_misc:start_timer(Interval, expired)};
+ensure_expire_timer(State = #state{expiry_interval = Interval}) when Interval > 0 andalso Interval =/= 16#ffffffff ->
+    State#state{expiry_timer = emqx_misc:start_timer(Interval * 1000, expired)};
 ensure_expire_timer(State) ->
     State.
 

+ 6 - 6
test/emqx_mock_client.erl

@@ -53,12 +53,12 @@ init([ClientId]) ->
     }.
 
 handle_call({start_session, ClientPid, ClientId, Zone}, _From, State) ->
-    Attrs = #{ zone        => Zone,
-               client_id   => ClientId,
-               conn_pid  => ClientPid,
-               clean_start => true,
-               username    => undefined,
-               conn_props  => undefined
+    Attrs = #{ zone             => Zone,
+               client_id        => ClientId,
+               conn_pid         => ClientPid,
+               clean_start      => true,
+               username         => undefined,
+               expiry_interval  => 0
              },
     {ok, SessPid} = emqx_sm:open_session(Attrs),
     {reply, {ok, SessPid}, State#state{

+ 1 - 1
test/emqx_sm_SUITE.erl

@@ -25,7 +25,7 @@ t_open_close_session(_) ->
     emqx_ct_broker_helpers:run_setup_steps(),
     {ok, ClientPid} = emqx_mock_client:start_link(<<"client">>),
     Attrs = #{clean_start => true, client_id => <<"client">>, conn_pid => ClientPid,
-              zone => internal, username => <<"zhou">>, conn_props => #{}},
+              zone => internal, username => <<"zhou">>, expiry_interval => 0},
     {ok, SPid} = emqx_sm:open_session(Attrs),
     [{<<"client">>, SPid}] = emqx_sm:lookup_session(<<"client">>),
     SPid = emqx_sm:lookup_session_pid(<<"client">>),