Sfoglia il codice sorgente

refactor(ds): rename `disconnected_at` to `last_alive_at`, add more assertions

Thales Macedo Garitezi 2 anni fa
parent
commit
09c4e40511

+ 13 - 0
apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl

@@ -9,6 +9,7 @@
 -include_lib("stdlib/include/assert.hrl").
 -include_lib("common_test/include/ct.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
+-include_lib("emqx/include/asserts.hrl").
 -include_lib("emqx/include/emqx_mqtt.hrl").
 
 -import(emqx_common_test_helpers, [on_exit/1]).
@@ -428,9 +429,13 @@ do_t_session_expiration(_Config, Opts) ->
     CommonParams = #{proto_ver => v5, clientid => ClientId},
     ?check_trace(
         begin
+            Topic = <<"some/topic">>,
             Params0 = maps:merge(CommonParams, FirstConn),
             Client0 = start_client(Params0),
             {ok, _} = emqtt:connect(Client0),
+            {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(Client0, Topic, ?QOS_2),
+            Subs0 = emqx_persistent_session_ds:list_all_subscriptions(),
+            ?assertEqual(1, map_size(Subs0), #{subs => Subs0}),
             Info0 = maps:from_list(emqtt:info(Client0)),
             ?assertEqual(0, maps:get(session_present, Info0), #{info => Info0}),
             emqtt:disconnect(Client0, ?RC_NORMAL_DISCONNECTION, FirstDisconn),
@@ -440,6 +445,8 @@ do_t_session_expiration(_Config, Opts) ->
             {ok, _} = emqtt:connect(Client1),
             Info1 = maps:from_list(emqtt:info(Client1)),
             ?assertEqual(1, maps:get(session_present, Info1), #{info => Info1}),
+            Subs1 = emqtt:subscriptions(Client1),
+            ?assertEqual([], Subs1),
             emqtt:disconnect(Client1, ?RC_NORMAL_DISCONNECTION, SecondDisconn),
 
             ct:sleep(1_500),
@@ -449,6 +456,12 @@ do_t_session_expiration(_Config, Opts) ->
             {ok, _} = emqtt:connect(Client2),
             Info2 = maps:from_list(emqtt:info(Client2)),
             ?assertEqual(0, maps:get(session_present, Info2), #{info => Info2}),
+            Subs2 = emqtt:subscriptions(Client2),
+            ?assertEqual([], Subs2),
+            emqtt:publish(Client2, Topic, <<"payload">>),
+            ?assertNotReceive({publish, #{topic := Topic}}),
+            %% ensure subscriptions are absent from table.
+            ?assertEqual(#{}, emqx_persistent_session_ds:list_all_subscriptions()),
             emqtt:disconnect(Client2, ?RC_NORMAL_DISCONNECTION, ThirdDisconn),
 
             ok

+ 16 - 16
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -98,8 +98,8 @@
     id := id(),
     %% When the session was created
     created_at := timestamp(),
-    %% When the client last disconnected
-    disconnected_at := timestamp() | never,
+    %% When the client was last considered alive
+    last_alive_at := timestamp(),
     %% Client’s Subscriptions.
     subscriptions := #{topic_filter() => subscription()},
     %% Inflight messages
@@ -126,10 +126,10 @@
     next_pkt_id
 ]).
 
--define(IS_EXPIRED(NOW_MS, DISCONNECTED_AT, EI),
-    (is_number(DisconnectedAt) andalso
+-define(IS_EXPIRED(NOW_MS, LAST_ALIVE_AT, EI),
+    (is_number(LAST_ALIVE_AT) andalso
         is_number(EI) andalso
-        (NowMS >= DisconnectedAt + EI))
+        (NOW_MS >= LAST_ALIVE_AT + EI))
 ).
 
 -export_type([id/0]).
@@ -408,7 +408,7 @@ replay(_ClientInfo, [], Session = #{inflight := Inflight0}) ->
 
 -spec disconnect(session(), emqx_types:conninfo()) -> {shutdown, session()}.
 disconnect(Session0, ConnInfo) ->
-    Session = session_set_disconnected_at_trans(Session0, ConnInfo, now_ms()),
+    Session = session_set_last_alive_at_trans(Session0, ConnInfo, now_ms()),
     {shutdown, Session}.
 
 -spec terminate(Reason :: term(), session()) -> ok.
@@ -544,16 +544,16 @@ session_open(SessionId, NewConnInfo) ->
     NowMS = now_ms(),
     transaction(fun() ->
         case mnesia:read(?SESSION_TAB, SessionId, write) of
-            [Record0 = #session{disconnected_at = DisconnectedAt, conninfo = ConnInfo}] ->
+            [Record0 = #session{last_alive_at = LastAliveAt, conninfo = ConnInfo}] ->
                 EI = expiry_interval(ConnInfo),
-                case ?IS_EXPIRED(NowMS, DisconnectedAt, EI) of
+                case ?IS_EXPIRED(NowMS, LastAliveAt, EI) of
                     true ->
                         session_drop(SessionId),
                         false;
                     false ->
                         %% new connection being established
                         Record1 = Record0#session{conninfo = NewConnInfo},
-                        Record = session_set_disconnected_at(Record1, never),
+                        Record = session_set_last_alive_at(Record1, never),
                         Session = export_session(Record),
                         DSSubs = session_read_subscriptions(SessionId),
                         Subscriptions = export_subscriptions(DSSubs),
@@ -585,30 +585,30 @@ session_create(SessionId, ConnInfo, Props) ->
     Session = #session{
         id = SessionId,
         created_at = now_ms(),
-        disconnected_at = never,
+        last_alive_at = now_ms(),
         conninfo = ConnInfo,
         props = Props
     },
     ok = mnesia:write(?SESSION_TAB, Session, write),
     Session.
 
-session_set_disconnected_at_trans(Session, NewConnInfo, DisconnectedAt) ->
+session_set_last_alive_at_trans(Session, NewConnInfo, LastAliveAt) ->
     #{id := SessionId} = Session,
     transaction(fun() ->
         case mnesia:read(?SESSION_TAB, SessionId, write) of
             [#session{} = SessionRecord0] ->
                 SessionRecord = SessionRecord0#session{conninfo = NewConnInfo},
-                _ = session_set_disconnected_at(SessionRecord, DisconnectedAt),
+                _ = session_set_last_alive_at(SessionRecord, LastAliveAt),
                 ok;
             _ ->
                 %% log and crash?
                 ok
         end
     end),
-    Session#{conninfo := NewConnInfo, disconnected_at := DisconnectedAt}.
+    Session#{conninfo := NewConnInfo, last_alive_at := LastAliveAt}.
 
-session_set_disconnected_at(SessionRecord0, DisconnectedAt) ->
-    SessionRecord = SessionRecord0#session{disconnected_at = DisconnectedAt},
+session_set_last_alive_at(SessionRecord0, LastAliveAt) ->
+    SessionRecord = SessionRecord0#session{last_alive_at = LastAliveAt},
     ok = mnesia:write(?SESSION_TAB, SessionRecord, write),
     SessionRecord.
 
@@ -849,7 +849,7 @@ export_subscriptions(DSSubs) ->
     ).
 
 export_session(#session{} = Record) ->
-    export_record(Record, #session.id, [id, created_at, disconnected_at, conninfo, props], #{}).
+    export_record(Record, #session.id, [id, created_at, last_alive_at, conninfo, props], #{}).
 
 export_subscription(#ds_sub{} = Record) ->
     export_record(Record, #ds_sub.start_time, [start_time, props, extra], #{}).

+ 1 - 1
apps/emqx/src/emqx_persistent_session_ds.hrl

@@ -73,7 +73,7 @@
     id :: emqx_persistent_session_ds:id(),
     %% creation time
     created_at :: _Millisecond :: non_neg_integer(),
-    disconnected_at = never :: _Millisecond :: non_neg_integer() | never,
+    last_alive_at :: _Millisecond :: non_neg_integer(),
     conninfo :: emqx_types:conninfo(),
     %% for future usage
     props = #{} :: map()