Просмотр исходного кода

feat(sessds): Store peername in the persistent session state

ieQu1 1 год назад
Родитель
Сommit
b6cc9177a6

+ 4 - 2
apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl

@@ -236,8 +236,9 @@ t_session_subscription_idempotency(Config) ->
         end,
         fun(Trace) ->
             ct:pal("trace:\n  ~p", [Trace]),
+            ConnInfo = #{peername => {undefined, undefined}},
             Session = erpc:call(
-                Node1, emqx_persistent_session_ds, session_open, [ClientId, _ConnInfo = #{}]
+                Node1, emqx_persistent_session_ds, session_open, [ClientId, ConnInfo]
             ),
             ?assertMatch(
                 #{SubTopicFilter := #{}},
@@ -312,8 +313,9 @@ t_session_unsubscription_idempotency(Config) ->
         end,
         fun(Trace) ->
             ct:pal("trace:\n  ~p", [Trace]),
+            ConnInfo = #{peername => {undefined, undefined}},
             Session = erpc:call(
-                Node1, emqx_persistent_session_ds, session_open, [ClientId, _ConnInfo = #{}]
+                Node1, emqx_persistent_session_ds, session_open, [ClientId, ConnInfo]
             ),
             ?assertEqual(
                 #{},

+ 4 - 1
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -633,7 +633,10 @@ session_open(SessionId, NewConnInfo) ->
                     %% New connection being established
                     S1 = emqx_persistent_session_ds_state:set_expiry_interval(EI, S0),
                     S2 = emqx_persistent_session_ds_state:set_last_alive_at(NowMS, S1),
-                    S = emqx_persistent_session_ds_state:commit(S2),
+                    S3 = emqx_persistent_session_ds_state:set_peername(
+                        maps:get(peername, NewConnInfo), S2
+                    ),
+                    S = emqx_persistent_session_ds_state:commit(S3),
                     Inflight = emqx_persistent_session_ds_inflight:new(
                         receive_maximum(NewConnInfo)
                     ),

+ 1 - 0
apps/emqx/src/emqx_persistent_session_ds.hrl

@@ -74,5 +74,6 @@
 -define(expiry_interval, expiry_interval).
 %% Unique integer used to create unique identities
 -define(last_id, last_id).
+-define(peername, peername).
 
 -endif.

+ 11 - 1
apps/emqx/src/emqx_persistent_session_ds_state.erl

@@ -30,6 +30,7 @@
 -export([get_created_at/1, set_created_at/2]).
 -export([get_last_alive_at/1, set_last_alive_at/2]).
 -export([get_expiry_interval/1, set_expiry_interval/2]).
+-export([get_peername/1, set_peername/2]).
 -export([new_id/1]).
 -export([get_stream/2, put_stream/3, del_stream/2, fold_streams/3]).
 -export([get_seqno/2, put_seqno/3]).
@@ -92,7 +93,8 @@
         ?created_at => emqx_persistent_session_ds:timestamp(),
         ?last_alive_at => emqx_persistent_session_ds:timestamp(),
         ?expiry_interval => non_neg_integer(),
-        ?last_id => integer()
+        ?last_id => integer(),
+        ?peername => emqx_types:peername()
     }.
 
 -type seqno_type() ::
@@ -278,6 +280,14 @@ get_expiry_interval(Rec) ->
 set_expiry_interval(Val, Rec) ->
     set_meta(?expiry_interval, Val, Rec).
 
+-spec get_peername(t()) -> emqx_types:peername() | undefined.
+get_peername(Rec) ->
+    get_meta(?peername, Rec).
+
+-spec set_peername(emqx_types:peername(), t()) -> t().
+set_peername(Val, Rec) ->
+    set_meta(?peername, Val, Rec).
+
 -spec new_id(t()) -> {emqx_persistent_session_ds:subscription_id(), t()}.
 new_id(Rec) ->
     LastId =