瀏覽代碼

refactor(session): pass config to `SessionImpl:open/3` as well

* Anticipate that connection info may change during session takeover.
* Avoid persisting session conf as part of persistent session state.
Andrew Mayorov 2 年之前
父節點
當前提交
88103c5f0e

+ 5 - 5
apps/emqx/include/emqx_session_mem.hrl

@@ -26,9 +26,9 @@
     %% Client’s Subscriptions.
     subscriptions :: map(),
     %% Max subscriptions allowed
-    max_subscriptions :: non_neg_integer() | infinity,
+    max_subscriptions = infinity :: non_neg_integer() | infinity,
     %% Upgrade QoS?
-    upgrade_qos :: boolean(),
+    upgrade_qos = false :: boolean(),
     %% Client <- Broker: QoS1/2 messages sent to the client but
     %% have not been unacked.
     inflight :: emqx_inflight:inflight(),
@@ -40,14 +40,14 @@
     %% Next packet id of the session
     next_pkt_id = 1 :: emqx_types:packet_id(),
     %% Retry interval for redelivering QoS1/2 messages (Unit: millisecond)
-    retry_interval :: timeout(),
+    retry_interval = 0 :: timeout(),
     %% Client -> Broker: QoS2 messages received from the client, but
     %% have not been completely acknowledged
     awaiting_rel :: map(),
     %% Maximum number of awaiting QoS2 messages allowed
-    max_awaiting_rel :: non_neg_integer() | infinity,
+    max_awaiting_rel = infinity :: non_neg_integer() | infinity,
     %% Awaiting PUBREL Timeout (Unit: millisecond)
-    await_rel_timeout :: timeout(),
+    await_rel_timeout = 0 :: timeout(),
     %% Created at
     created_at :: pos_integer()
 }).

+ 16 - 18
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -29,7 +29,7 @@
 %% Session API
 -export([
     create/3,
-    open/2,
+    open/3,
     destroy/1
 ]).
 
@@ -119,6 +119,8 @@
     conninfo := emqx_types:conninfo(),
     %% Timers
     timer() => reference(),
+    %% Upgrade QoS?
+    upgrade_qos := boolean(),
     %%
     props := map()
 }.
@@ -151,11 +153,12 @@
     session().
 create(#{clientid := ClientID}, ConnInfo, Conf) ->
     % TODO: expiration
-    ensure_timers(ensure_session(ClientID, ConnInfo, Conf)).
+    Session = ensure_timers(session_ensure_new(ClientID, ConnInfo)),
+    preserve_conf(ConnInfo, Conf, Session).
 
--spec open(clientinfo(), conninfo()) ->
+-spec open(clientinfo(), conninfo(), emqx_session:conf()) ->
     {_IsPresent :: true, session(), []} | false.
-open(#{clientid := ClientID} = _ClientInfo, ConnInfo) ->
+open(#{clientid := ClientID} = _ClientInfo, ConnInfo, Conf) ->
     %% NOTE
     %% The fact that we need to concern about discarding all live channels here
     %% is essentially a consequence of the in-memory session design, where we
@@ -165,20 +168,16 @@ open(#{clientid := ClientID} = _ClientInfo, ConnInfo) ->
     ok = emqx_cm:discard_session(ClientID),
     case session_open(ClientID, ConnInfo) of
         Session0 = #{} ->
-            ReceiveMaximum = receive_maximum(ConnInfo),
-            Session = Session0#{receive_maximum => ReceiveMaximum},
+            Session = preserve_conf(ConnInfo, Conf, Session0),
             {true, ensure_timers(Session), []};
         false ->
             false
     end.
 
-ensure_session(ClientID, ConnInfo, Conf) ->
-    Session = session_ensure_new(ClientID, ConnInfo, Conf),
-    ReceiveMaximum = receive_maximum(ConnInfo),
+preserve_conf(ConnInfo, Conf, Session) ->
     Session#{
-        conninfo => ConnInfo,
-        receive_maximum => ReceiveMaximum,
-        subscriptions => #{}
+        receive_maximum => receive_maximum(ConnInfo),
+        upgrade_qos => maps:get(upgrade_qos, Conf)
     }.
 
 -spec destroy(session() | clientinfo()) -> ok.
@@ -644,25 +643,24 @@ session_open(SessionId, NewConnInfo) ->
         end
     end).
 
--spec session_ensure_new(id(), emqx_types:conninfo(), _Props :: map()) ->
+-spec session_ensure_new(id(), emqx_types:conninfo()) ->
     session().
-session_ensure_new(SessionId, ConnInfo, Props) ->
+session_ensure_new(SessionId, ConnInfo) ->
     transaction(fun() ->
         ok = session_drop_subscriptions(SessionId),
-        Session = export_session(session_create(SessionId, ConnInfo, Props)),
+        Session = export_session(session_create(SessionId, ConnInfo)),
         Session#{
             subscriptions => #{},
             inflight => emqx_persistent_message_ds_replayer:new()
         }
     end).
 
-session_create(SessionId, ConnInfo, Props) ->
+session_create(SessionId, ConnInfo) ->
     Session = #session{
         id = SessionId,
         created_at = now_ms(),
         last_alive_at = now_ms(),
-        conninfo = ConnInfo,
-        props = Props
+        conninfo = ConnInfo
     },
     ok = mnesia:write(?SESSION_TAB, Session, write),
     Session.

+ 11 - 17
apps/emqx/src/emqx_session.erl

@@ -102,7 +102,7 @@
 -export([should_keep/1]).
 
 % Tests only
--export([get_session_conf/2]).
+-export([get_session_conf/1]).
 
 -export_type([
     t/0,
@@ -137,8 +137,6 @@
 -type conf() :: #{
     %% Max subscriptions allowed
     max_subscriptions := non_neg_integer() | infinity,
-    %% Max inflight messages allowed
-    max_inflight := non_neg_integer(),
     %% Maximum number of awaiting QoS2 messages allowed
     max_awaiting_rel := non_neg_integer() | infinity,
     %% Upgrade QoS?
@@ -171,7 +169,7 @@
 
 -callback create(clientinfo(), conninfo(), conf()) ->
     t().
--callback open(clientinfo(), conninfo()) ->
+-callback open(clientinfo(), conninfo(), conf()) ->
     {_IsPresent :: true, t(), _ReplayContext} | false.
 -callback destroy(t() | clientinfo()) -> ok.
 
@@ -181,7 +179,7 @@
 
 -spec create(clientinfo(), conninfo()) -> t().
 create(ClientInfo, ConnInfo) ->
-    Conf = get_session_conf(ClientInfo, ConnInfo),
+    Conf = get_session_conf(ClientInfo),
     create(ClientInfo, ConnInfo, Conf).
 
 create(ClientInfo, ConnInfo, Conf) ->
@@ -198,12 +196,12 @@ create(Mod, ClientInfo, ConnInfo, Conf) ->
 -spec open(clientinfo(), conninfo()) ->
     {_IsPresent :: true, t(), _ReplayContext} | {_IsPresent :: false, t()}.
 open(ClientInfo, ConnInfo) ->
-    Conf = get_session_conf(ClientInfo, ConnInfo),
+    Conf = get_session_conf(ClientInfo),
     Mods = [Default | _] = choose_impl_candidates(ConnInfo),
     %% NOTE
     %% Try to look the existing session up in session stores corresponding to the given
     %% `Mods` in order, starting from the last one.
-    case try_open(Mods, ClientInfo, ConnInfo) of
+    case try_open(Mods, ClientInfo, ConnInfo, Conf) of
         {_IsPresent = true, _, _} = Present ->
             Present;
         false ->
@@ -212,24 +210,20 @@ open(ClientInfo, ConnInfo) ->
             {false, create(Default, ClientInfo, ConnInfo, Conf)}
     end.
 
-try_open([Mod | Rest], ClientInfo, ConnInfo) ->
-    case try_open(Rest, ClientInfo, ConnInfo) of
+try_open([Mod | Rest], ClientInfo, ConnInfo, Conf) ->
+    case try_open(Rest, ClientInfo, ConnInfo, Conf) of
         {_IsPresent = true, _, _} = Present ->
             Present;
         false ->
-            Mod:open(ClientInfo, ConnInfo)
+            Mod:open(ClientInfo, ConnInfo, Conf)
     end;
-try_open([], _ClientInfo, _ConnInfo) ->
+try_open([], _ClientInfo, _ConnInfo, _Conf) ->
     false.
 
--spec get_session_conf(clientinfo(), conninfo()) -> conf().
-get_session_conf(
-    #{zone := Zone},
-    #{receive_maximum := MaxInflight}
-) ->
+-spec get_session_conf(clientinfo()) -> conf().
+get_session_conf(_ClientInfo = #{zone := Zone}) ->
     #{
         max_subscriptions => get_mqtt_conf(Zone, max_subscriptions),
-        max_inflight => MaxInflight,
         max_awaiting_rel => get_mqtt_conf(Zone, max_awaiting_rel),
         upgrade_qos => get_mqtt_conf(Zone, upgrade_qos),
         retry_interval => get_mqtt_conf(Zone, retry_interval),

+ 30 - 14
apps/emqx/src/emqx_session_mem.erl

@@ -59,7 +59,7 @@
 
 -export([
     create/3,
-    open/2,
+    open/3,
     destroy/1
 ]).
 
@@ -152,24 +152,24 @@
 
 -spec create(clientinfo(), conninfo(), emqx_session:conf()) ->
     session().
-create(#{zone := Zone, clientid := ClientId}, #{expiry_interval := EI}, Conf) ->
+create(
+    #{zone := Zone, clientid := ClientId},
+    #{expiry_interval := EI, receive_maximum := ReceiveMax},
+    Conf
+) ->
     QueueOpts = get_mqueue_conf(Zone),
-    #session{
+    Session = #session{
         id = emqx_guid:gen(),
         clientid = ClientId,
         created_at = erlang:system_time(millisecond),
         is_persistent = EI > 0,
         subscriptions = #{},
-        inflight = emqx_inflight:new(maps:get(max_inflight, Conf)),
+        inflight = emqx_inflight:new(ReceiveMax),
         mqueue = emqx_mqueue:init(QueueOpts),
         next_pkt_id = 1,
-        awaiting_rel = #{},
-        max_subscriptions = maps:get(max_subscriptions, Conf),
-        max_awaiting_rel = maps:get(max_awaiting_rel, Conf),
-        upgrade_qos = maps:get(upgrade_qos, Conf),
-        retry_interval = maps:get(retry_interval, Conf),
-        await_rel_timeout = maps:get(await_rel_timeout, Conf)
-    }.
+        awaiting_rel = #{}
+    },
+    preserve_conf(Conf, Session).
 
 get_mqueue_conf(Zone) ->
     #{
@@ -195,14 +195,16 @@ destroy(_Session) ->
 %% Open a (possibly existing) Session
 %%--------------------------------------------------------------------
 
--spec open(clientinfo(), conninfo()) ->
+-spec open(clientinfo(), conninfo(), emqx_session:conf()) ->
     {_IsPresent :: true, session(), replayctx()} | _IsPresent :: false.
-open(ClientInfo = #{clientid := ClientId}, _ConnInfo) ->
+open(ClientInfo = #{clientid := ClientId}, ConnInfo, Conf) ->
     case emqx_cm:takeover_session_begin(ClientId) of
         {ok, SessionRemote, TakeoverState} ->
-            Session = resume(ClientInfo, SessionRemote),
+            Session0 = resume(ClientInfo, SessionRemote),
             case emqx_cm:takeover_session_end(TakeoverState) of
                 {ok, Pendings} ->
+                    Session1 = resize_inflight(ConnInfo, Session0),
+                    Session = preserve_conf(Conf, Session1),
                     clean_session(ClientInfo, Session, Pendings);
                 {error, _} ->
                     % TODO log error?
@@ -212,6 +214,20 @@ open(ClientInfo = #{clientid := ClientId}, _ConnInfo) ->
             false
     end.
 
+resize_inflight(#{receive_maximum := ReceiveMax}, Session = #session{inflight = Inflight}) ->
+    Session#session{
+        inflight = emqx_inflight:resize(ReceiveMax, Inflight)
+    }.
+
+preserve_conf(Conf, Session = #session{}) ->
+    Session#session{
+        max_subscriptions = maps:get(max_subscriptions, Conf),
+        max_awaiting_rel = maps:get(max_awaiting_rel, Conf),
+        upgrade_qos = maps:get(upgrade_qos, Conf),
+        retry_interval = maps:get(retry_interval, Conf),
+        await_rel_timeout = maps:get(await_rel_timeout, Conf)
+    }.
+
 clean_session(ClientInfo, Session = #session{mqueue = Q}, Pendings) ->
     Q1 = emqx_mqueue:filter(fun emqx_session:should_keep/1, Q),
     Session1 = Session#session{mqueue = Q1},

+ 2 - 2
apps/emqx/test/emqx_session_mem_SUITE.erl

@@ -67,7 +67,7 @@ t_session_init(_) ->
     Session = emqx_session_mem:create(
         ClientInfo,
         ConnInfo,
-        emqx_session:get_session_conf(ClientInfo, ConnInfo)
+        emqx_session:get_session_conf(ClientInfo)
     ),
     ?assertEqual(#{}, emqx_session_mem:info(subscriptions, Session)),
     ?assertEqual(0, emqx_session_mem:info(subscriptions_cnt, Session)),
@@ -531,7 +531,7 @@ session(InitFields) when is_map(InitFields) ->
     Session = emqx_session_mem:create(
         ClientInfo,
         ConnInfo,
-        emqx_session:get_session_conf(ClientInfo, ConnInfo)
+        emqx_session:get_session_conf(ClientInfo)
     ),
     maps:fold(
         fun(Field, Value, SessionAcc) ->

+ 1 - 1
apps/emqx_gateway_mqttsn/src/emqx_mqttsn_session.erl

@@ -54,7 +54,7 @@
 
 init(ClientInfo) ->
     ConnInfo = #{receive_maximum => 1, expiry_interval => 0},
-    SessionConf = emqx_session:get_session_conf(ClientInfo, ConnInfo),
+    SessionConf = emqx_session:get_session_conf(ClientInfo),
     #{
         registry => emqx_mqttsn_registry:init(),
         session => emqx_session_mem:create(ClientInfo, ConnInfo, SessionConf)