Просмотр исходного кода

feat(ds): make durable sessions handle will messages

Fixes https://emqx.atlassian.net/browse/EMQX-10431
Thales Macedo Garitezi 1 год назад
Родитель
Сommit
0f426e6e77

+ 62 - 11
apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl

@@ -25,7 +25,6 @@ all() ->
     emqx_common_test_helpers:all(?MODULE).
 
 init_per_suite(Config) ->
-    emqx_common_test_helpers:clear_screen(),
     TCApps = emqx_cth_suite:start(
         app_specs(),
         #{work_dir => emqx_cth_suite:work_dir(Config)}
@@ -144,6 +143,7 @@ wait_gen_rpc_down(_NodeSpec = #{apps := Apps}) ->
 
 start_client(Opts0 = #{}) ->
     Defaults = #{
+        port => 1883,
         proto_ver => v5,
         properties => #{'Session-Expiry-Interval' => 300}
     },
@@ -190,6 +190,23 @@ list_all_subscriptions(Node) ->
 list_all_pubranges(Node) ->
     erpc:call(Node, emqx_persistent_session_ds, list_all_pubranges, []).
 
+session_open(Node, ClientId) ->
+    ClientInfo = #{},
+    ConnInfo = #{peername => {undefined, undefined}},
+    WillMsg = undefined,
+    erpc:call(
+        Node,
+        emqx_persistent_session_ds,
+        session_open,
+        [ClientId, ClientInfo, ConnInfo, WillMsg]
+    ).
+
+force_last_alive_at(ClientId, Time) ->
+    {ok, S0} = emqx_persistent_session_ds_state:open(ClientId),
+    S = emqx_persistent_session_ds_state:set_last_alive_at(Time, S0),
+    _ = emqx_persistent_session_ds_state:commit(S),
+    ok.
+
 %%------------------------------------------------------------------------------
 %% Testcases
 %%------------------------------------------------------------------------------
@@ -244,11 +261,7 @@ t_session_subscription_idempotency(Config) ->
         end,
         fun(Trace) ->
             ct:pal("trace:\n  ~p", [Trace]),
-            ConnInfo = #{peername => {undefined, undefined}},
-            WillMsg = undefined,
-            Session = erpc:call(
-                Node1, emqx_persistent_session_ds, session_open, [ClientId, ConnInfo, WillMsg]
-            ),
+            Session = session_open(Node1, ClientId),
             ?assertMatch(
                 #{SubTopicFilter := #{}},
                 emqx_session:info(subscriptions, Session)
@@ -322,11 +335,7 @@ t_session_unsubscription_idempotency(Config) ->
         end,
         fun(Trace) ->
             ct:pal("trace:\n  ~p", [Trace]),
-            ConnInfo = #{peername => {undefined, undefined}},
-            WillMsg = undefined,
-            Session = erpc:call(
-                Node1, emqx_persistent_session_ds, session_open, [ClientId, ConnInfo, WillMsg]
-            ),
+            Session = session_open(Node1, ClientId),
             ?assertEqual(
                 #{},
                 emqx_session:info(subscriptions, Session)
@@ -555,6 +564,7 @@ t_session_gc(Config) ->
             ),
 
             %% Clients are still alive; no session is garbage collected.
+            ?tp(notice, "waiting for gc", #{}),
             ?assertMatch(
                 {ok, _},
                 ?block_until(
@@ -567,9 +577,11 @@ t_session_gc(Config) ->
             ),
             ?assertMatch([_, _, _], list_all_sessions(Node1), sessions),
             ?assertMatch([_, _, _], list_all_subscriptions(Node1), subscriptions),
+            ?tp(notice, "gc ran", #{}),
 
             %% Now we disconnect 2 of them; only those should be GC'ed.
 
+            ?tp(notice, "disconnecting client1", #{}),
             ?assertMatch(
                 {ok, {ok, _}},
                 ?wait_async_action(
@@ -674,3 +686,42 @@ t_session_replay_retry(_Config) ->
         [maps:with([topic, payload, qos], P) || P <- Pubs0],
         [maps:with([topic, payload, qos], P) || P <- Pubs1 ++ Pubs2]
     ).
+
+%% Check that we send will messages when performing GC without relying on timers set by
+%% the channel process.
+t_session_gc_will_message(_Config) ->
+    ?check_trace(
+        #{timetrap => 10_000},
+        begin
+            WillTopic = <<"will/t">>,
+            ok = emqx:subscribe(WillTopic, #{qos => 2}),
+            ClientId = <<"will_msg_client">>,
+            Client = start_client(#{
+                clientid => ClientId,
+                will_topic => WillTopic,
+                will_payload => <<"will payload">>,
+                will_qos => 0,
+                will_props => #{'Will-Delay-Interval' => 300}
+            }),
+            {ok, _} = emqtt:connect(Client),
+            %% Use reason code =/= `?RC_SUCCESS' to allow will message
+            {ok, {ok, _}} =
+                ?wait_async_action(
+                    emqtt:disconnect(Client, ?RC_UNSPECIFIED_ERROR),
+                    #{?snk_kind := emqx_cm_clean_down}
+                ),
+            ?assertNotReceive({deliver, WillTopic, _}),
+            %% Set fake `last_alive_at' to trigger immediate will message.
+            force_last_alive_at(ClientId, _Time = 0),
+            {ok, {ok, _}} =
+                ?wait_async_action(
+                    emqx_persistent_session_ds_gc_worker:check_session(ClientId),
+                    #{?snk_kind := session_gc_published_will_msg}
+                ),
+            ?assertReceive({deliver, WillTopic, _}),
+
+            ok
+        end,
+        []
+    ),
+    ok.

+ 1 - 0
apps/emqx/priv/bpapi.versions

@@ -12,6 +12,7 @@
 {emqx_broker,1}.
 {emqx_cm,1}.
 {emqx_cm,2}.
+{emqx_cm,3}.
 {emqx_conf,1}.
 {emqx_conf,2}.
 {emqx_conf,3}.

+ 65 - 25
apps/emqx/src/emqx_channel.erl

@@ -64,6 +64,12 @@
     maybe_nack/1
 ]).
 
+%% Export for DS session GC worker and session implementations
+-export([
+    will_delay_interval/1,
+    prepare_will_message_for_publishing/2
+]).
+
 %% Exports for CT
 -export([set_field/3]).
 
@@ -885,9 +891,10 @@ do_unsubscribe(
 %%--------------------------------------------------------------------
 
 %% MQTT-v5.0: 3.14.4 DISCONNECT Actions
-maybe_clean_will_msg(?RC_SUCCESS, Channel) ->
+maybe_clean_will_msg(?RC_SUCCESS, Channel = #channel{session = Session0}) ->
     %% [MQTT-3.14.4-3]
-    Channel#channel{will_msg = undefined};
+    Session = emqx_session:clear_will_message(Session0),
+    Channel#channel{will_msg = undefined, session = Session};
 maybe_clean_will_msg(_ReasonCode, Channel) ->
     Channel.
 
@@ -1204,6 +1211,9 @@ handle_call(
     ),
     Channel0 = maybe_publish_will_msg(takenover, Channel),
     disconnect_and_shutdown(takenover, AllPendings, Channel0);
+handle_call(takeover_kick, Channel) ->
+    Channel0 = maybe_publish_will_msg(takenover, Channel),
+    disconnect_and_shutdown(takenover, ok, Channel0);
 handle_call(list_authz_cache, Channel) ->
     {reply, emqx_authz_cache:list_authz_cache(), Channel};
 handle_call(
@@ -2301,17 +2311,17 @@ maybe_publish_will_msg(
     Channel;
 maybe_publish_will_msg(
     _Reason,
-    Channel = #channel{
+    Channel0 = #channel{
         conninfo = #{proto_ver := ?MQTT_PROTO_V3, clientid := ClientId}
     }
 ) ->
     %% Unconditionally publish will message for MQTT 3.1.1
     ?tp(debug, maybe_publish_willmsg_v3, #{clientid => ClientId}),
-    _ = publish_will_msg(Channel),
-    Channel#channel{will_msg = undefined};
+    Channel = publish_will_msg(Channel0),
+    remove_willmsg(Channel);
 maybe_publish_will_msg(
     Reason,
-    Channel = #channel{
+    Channel0 = #channel{
         conninfo = #{clientid := ClientId}
     }
 ) when
@@ -2329,12 +2339,20 @@ maybe_publish_will_msg(
     %% d. internal_error (maybe not recoverable)
     %% This ensures willmsg will be published if the willmsg timer is scheduled but not fired
     %% OR fired but not yet handled
-    ?tp(debug, maybe_publish_willmsg_session_ends, #{clientid => ClientId, reason => Reason}),
-    _ = publish_will_msg(Channel),
-    remove_willmsg(Channel);
+    %% NOTE! For durable sessions, `?chan_terminating' does NOT imply that the session is
+    %% gone.
+    case is_durable_session(Channel0) andalso Reason =:= ?chan_terminating of
+        false ->
+            ?tp(debug, maybe_publish_willmsg_session_ends, #{clientid => ClientId, reason => Reason}),
+
+            Channel = publish_will_msg(Channel0),
+            remove_willmsg(Channel);
+        true ->
+            Channel0
+    end;
 maybe_publish_will_msg(
     takenover,
-    Channel = #channel{
+    Channel0 = #channel{
         will_msg = WillMsg,
         conninfo = #{clientid := ClientId}
     }
@@ -2352,7 +2370,8 @@ maybe_publish_will_msg(
     case will_delay_interval(WillMsg) of
         0 ->
             ?tp(debug, maybe_publish_willmsg_takenover_pub, #{clientid => ClientId}),
-            _ = publish_will_msg(Channel);
+            Channel = publish_will_msg(Channel0),
+            ok;
         I when I > 0 ->
             %% @NOTE Non-normative comment in MQTT 5.0 spec
             %% """
@@ -2361,12 +2380,13 @@ maybe_publish_will_msg(
             %% before the Will Message is published.
             %% """
             ?tp(debug, maybe_publish_willmsg_takenover_skip, #{clientid => ClientId}),
+            Channel = Channel0,
             skip
     end,
     remove_willmsg(Channel);
 maybe_publish_will_msg(
     Reason,
-    Channel = #channel{
+    Channel0 = #channel{
         will_msg = WillMsg,
         conninfo = #{clientid := ClientId}
     }
@@ -2377,11 +2397,11 @@ maybe_publish_will_msg(
             ?tp(debug, maybe_publish_will_msg_other_publish, #{
                 clientid => ClientId, reason => Reason
             }),
-            _ = publish_will_msg(Channel),
+            Channel = publish_will_msg(Channel0),
             remove_willmsg(Channel);
         I when I > 0 ->
             ?tp(debug, maybe_publish_will_msg_other_delay, #{clientid => ClientId, reason => Reason}),
-            ensure_timer(will_message, timer:seconds(I), Channel)
+            ensure_timer(will_message, timer:seconds(I), Channel0)
     end.
 
 will_delay_interval(WillMsg) ->
@@ -2394,15 +2414,15 @@ will_delay_interval(WillMsg) ->
 publish_will_msg(
     #channel{
         session = Session,
-        clientinfo = ClientInfo = #{mountpoint := MountPoint},
+        clientinfo = ClientInfo,
         will_msg = Msg = #message{topic = Topic}
-    }
+    } = Channel
 ) ->
-    Action = authz_action(Msg),
-    PublishingDisallowed = emqx_access_control:authorize(ClientInfo, Action, Topic) =/= allow,
-    ClientBanned = emqx_banned:check(ClientInfo),
-    case PublishingDisallowed orelse ClientBanned of
-        true ->
+    case prepare_will_message_for_publishing(ClientInfo, Msg) of
+        {ok, PreparedMessage} ->
+            NSession = emqx_session:publish_will_message_now(Session, PreparedMessage),
+            Channel#channel{session = NSession};
+        {error, #{client_banned := ClientBanned, publishing_disallowed := PublishingDisallowed}} ->
             ?tp(
                 warning,
                 last_will_testament_publish_denied,
@@ -2412,12 +2432,23 @@ publish_will_msg(
                     publishing_disallowed => PublishingDisallowed
                 }
             ),
-            ok;
+            Channel
+    end.
+
+prepare_will_message_for_publishing(
+    ClientInfo = #{mountpoint := MountPoint},
+    Msg = #message{topic = Topic}
+) ->
+    Action = authz_action(Msg),
+    PublishingDisallowed = emqx_access_control:authorize(ClientInfo, Action, Topic) =/= allow,
+    ClientBanned = emqx_banned:check(ClientInfo),
+    case PublishingDisallowed orelse ClientBanned of
+        true ->
+            {error, #{client_banned => ClientBanned, publishing_disallowed => PublishingDisallowed}};
         false ->
             NMsg = emqx_mountpoint:mount(MountPoint, Msg),
-            NMsg2 = NMsg#message{timestamp = erlang:system_time(millisecond)},
-            ok = emqx_session:publish_will_message(Session, NMsg2),
-            ok
+            PreparedMessage = NMsg#message{timestamp = emqx_message:timestamp_now()},
+            {ok, PreparedMessage}
     end.
 
 %%--------------------------------------------------------------------
@@ -2529,6 +2560,15 @@ remove_willmsg(Channel = #channel{timers = Timers}) ->
                 timers = maps:remove(will_message, Timers)
             }
     end.
+
+is_durable_session(#channel{session = Session}) ->
+    case emqx_session:info(impl, Session) of
+        emqx_persistent_session_ds ->
+            true;
+        _ ->
+            false
+    end.
+
 %%--------------------------------------------------------------------
 %% For CT tests
 %%--------------------------------------------------------------------

+ 74 - 3
apps/emqx/src/emqx_cm.erl

@@ -53,7 +53,8 @@
     takeover_session_begin/1,
     takeover_session_end/1,
     kick_session/1,
-    kick_session/2
+    kick_session/2,
+    takeover_kick/1
 ]).
 
 -export([
@@ -100,6 +101,7 @@
     takeover_session/2,
     takeover_finish/2,
     do_kick_session/3,
+    do_takeover_kick_session_v3/2,
     do_get_chan_info/2,
     do_get_chan_stats/2,
     do_get_chann_conn_mod/2
@@ -122,6 +124,8 @@
 
 -type takeover_state() :: {_ConnMod :: module(), _ChanPid :: pid()}.
 
+-define(BPAPI_NAME, emqx_cm).
+
 -define(CHAN_STATS, [
     {?CHAN_TAB, 'channels.count', 'channels.max'},
     {?CHAN_TAB, 'sessions.count', 'sessions.max'},
@@ -352,6 +356,38 @@ pick_channel(ClientId) ->
             ChanPid
     end.
 
+%% Used by `emqx_persistent_session_ds'
+-spec takeover_kick(emqx_types:clientid()) -> ok.
+takeover_kick(ClientId) ->
+    case lookup_channels(ClientId) of
+        [] ->
+            ok;
+        ChanPids ->
+            lists:foreach(
+                fun(Pid) ->
+                    do_takeover_session(ClientId, Pid)
+                end,
+                ChanPids
+            )
+    end.
+
+%% Used by `emqx_persistent_session_ds'.
+%% We stop any running channels with reason `takenover' so that correct reason codes and
+%% will message processing may take place.  For older BPAPI nodes, we don't have much
+%% choice other than calling the old `discard_session' code.
+do_takeover_session(ClientId, Pid) ->
+    Node = node(Pid),
+    case emqx_bpapi:supported_version(Node, ?BPAPI_NAME) of
+        undefined ->
+            %% Race: node (re)starting? Assume v2.
+            discard_session(ClientId, Pid);
+        Vsn when Vsn =< 2 ->
+            discard_session(ClientId, Pid);
+        _Vsn ->
+            takeover_kick_session(ClientId, Pid)
+    end.
+
+%% Used only by `emqx_session_mem'
 takeover_finish(ConnMod, ChanPid) ->
     request_stepdown(
         {takeover, 'end'},
@@ -360,6 +396,7 @@ takeover_finish(ConnMod, ChanPid) ->
     ).
 
 %% @doc RPC Target @ emqx_cm_proto_v2:takeover_session/2
+%% Used only by `emqx_session_mem'
 takeover_session(ClientId, Pid) ->
     try
         do_takeover_begin(ClientId, Pid)
@@ -415,7 +452,7 @@ discard_session(ClientId) when is_binary(ClientId) ->
     | {ok, emqx_session:t() | _ReplayContext}
     | {error, term()}
 when
-    Action :: kick | discard | {takeover, 'begin'} | {takeover, 'end'}.
+    Action :: kick | discard | {takeover, 'begin'} | {takeover, 'end'} | takeover_kick.
 request_stepdown(Action, ConnMod, Pid) ->
     Timeout =
         case Action == kick orelse Action == discard of
@@ -496,7 +533,19 @@ do_kick_session(Action, ClientId, ChanPid) when node(ChanPid) =:= node() ->
             ok = request_stepdown(Action, ConnMod, ChanPid)
     end.
 
-%% @private This function is shared for session 'kick' and 'discard' (as the first arg Action).
+%% @doc RPC Target for emqx_cm_proto_v3:takeover_kick_session/3
+-spec do_takeover_kick_session_v3(emqx_types:clientid(), chan_pid()) -> ok.
+do_takeover_kick_session_v3(ClientId, ChanPid) when node(ChanPid) =:= node() ->
+    case do_get_chann_conn_mod(ClientId, ChanPid) of
+        undefined ->
+            %% already deregistered
+            ok;
+        ConnMod when is_atom(ConnMod) ->
+            ok = request_stepdown(takeover_kick, ConnMod, ChanPid)
+    end.
+
+%% @private This function is shared for session `kick' and `discard' (as the first arg
+%% Action).
 kick_session(Action, ClientId, ChanPid) ->
     try
         wrap_rpc(emqx_cm_proto_v2:kick_session(Action, ClientId, ChanPid))
@@ -519,6 +568,28 @@ kick_session(Action, ClientId, ChanPid) ->
             )
     end.
 
+takeover_kick_session(ClientId, ChanPid) ->
+    try
+        wrap_rpc(emqx_cm_proto_v3:takeover_kick_session(ClientId, ChanPid))
+    catch
+        Error:Reason ->
+            %% This should mostly be RPC failures.
+            %% However, if the node is still running the old version
+            %% code (prior to emqx app 4.3.10) some of the RPC handler
+            %% exceptions may get propagated to a new version node
+            ?SLOG(
+                error,
+                #{
+                    msg => "failed_to_kick_session_on_remote_node",
+                    node => node(ChanPid),
+                    action => takeover,
+                    error => Error,
+                    reason => Reason
+                },
+                #{clientid => ClientId}
+            )
+    end.
+
 kick_session(ClientId) ->
     case lookup_channels(ClientId) of
         [] ->

+ 41 - 17
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -68,7 +68,8 @@
 
 %% Will message handling
 -export([
-    publish_will_message/2
+    clear_will_message/1,
+    publish_will_message_now/2
 ]).
 
 %% Managment APIs:
@@ -93,7 +94,7 @@
 
 -ifdef(TEST).
 -export([
-    session_open/3,
+    session_open/4,
     list_all_sessions/0
 ]).
 -endif.
@@ -189,20 +190,20 @@
 
 -spec create(clientinfo(), conninfo(), emqx_maybe:t(message()), emqx_session:conf()) ->
     session().
-create(#{clientid := ClientID}, ConnInfo, MaybeWillMsg, Conf) ->
-    ensure_timers(session_ensure_new(ClientID, ConnInfo, MaybeWillMsg, Conf)).
+create(#{clientid := ClientID} = ClientInfo, ConnInfo, MaybeWillMsg, Conf) ->
+    ensure_timers(session_ensure_new(ClientID, ClientInfo, ConnInfo, MaybeWillMsg, Conf)).
 
 -spec open(clientinfo(), conninfo(), emqx_maybe:t(message()), emqx_session:conf()) ->
     {_IsPresent :: true, session(), []} | false.
-open(#{clientid := ClientID} = _ClientInfo, ConnInfo, MaybeWillMsg, Conf) ->
+open(#{clientid := ClientID} = ClientInfo, ConnInfo, MaybeWillMsg, Conf) ->
     %% NOTE
     %% The fact that we need to concern about discarding all live channels here
     %% is essentially a consequence of the in-memory session design, where we
     %% have disconnected channels holding onto session state. Ideally, we should
     %% somehow isolate those idling not-yet-expired sessions into a separate process
     %% space, and move this call back into `emqx_cm` where it belongs.
-    ok = emqx_cm:discard_session(ClientID),
-    case session_open(ClientID, ConnInfo, MaybeWillMsg) of
+    ok = emqx_cm:takeover_kick(ClientID),
+    case session_open(ClientID, ClientInfo, ConnInfo, MaybeWillMsg) of
         Session0 = #{} ->
             Session = Session0#{props => Conf},
             {true, ensure_timers(Session), []};
@@ -613,7 +614,8 @@ disconnect(Session = #{s := S0}, ConnInfo) ->
     {shutdown, Session#{s => S}}.
 
 -spec terminate(Reason :: term(), session()) -> ok.
-terminate(_Reason, _Session = #{id := Id, s := S}) ->
+terminate(_Reason, Session = #{id := Id, s := S}) ->
+    maybe_set_will_message_timer(Session),
     _ = emqx_persistent_session_ds_state:commit(S),
     ?tp(debug, persistent_session_ds_terminate, #{id => Id}),
     ok.
@@ -685,9 +687,9 @@ sync(ClientId) ->
 %%
 %% Note: session API doesn't handle session takeovers, it's the job of
 %% the broker.
--spec session_open(id(), emqx_types:conninfo(), emqx_maybe:t(message())) ->
+-spec session_open(id(), emqx_types:clientinfo(), emqx_types:conninfo(), emqx_maybe:t(message())) ->
     session() | false.
-session_open(SessionId, NewConnInfo, MaybeWillMsg) ->
+session_open(SessionId, ClientInfo, NewConnInfo, MaybeWillMsg) ->
     NowMS = now_ms(),
     case emqx_persistent_session_ds_state:open(SessionId) of
         {ok, S0} ->
@@ -706,7 +708,8 @@ session_open(SessionId, NewConnInfo, MaybeWillMsg) ->
                         maps:get(peername, NewConnInfo), S2
                     ),
                     S4 = emqx_persistent_session_ds_state:set_will_message(MaybeWillMsg, S3),
-                    S = emqx_persistent_session_ds_state:commit(S4),
+                    S5 = emqx_persistent_session_ds_state:set_clientinfo(ClientInfo, S4),
+                    S = emqx_persistent_session_ds_state:commit(S5),
                     Inflight = emqx_persistent_session_ds_inflight:new(
                         receive_maximum(NewConnInfo)
                     ),
@@ -723,12 +726,13 @@ session_open(SessionId, NewConnInfo, MaybeWillMsg) ->
 
 -spec session_ensure_new(
     id(),
+    emqx_types:clientinfo(),
     emqx_types:conninfo(),
     emqx_maybe:t(message()),
     emqx_session:conf()
 ) ->
     session().
-session_ensure_new(Id, ConnInfo, MaybeWillMsg, Conf) ->
+session_ensure_new(Id, ClientInfo, ConnInfo, MaybeWillMsg, Conf) ->
     ?tp(debug, persistent_session_ds_ensure_new, #{id => Id}),
     Now = now_ms(),
     S0 = emqx_persistent_session_ds_state:create_new(Id),
@@ -751,7 +755,8 @@ session_ensure_new(Id, ConnInfo, MaybeWillMsg, Conf) ->
         ]
     ),
     S5 = emqx_persistent_session_ds_state:set_will_message(MaybeWillMsg, S4),
-    S = emqx_persistent_session_ds_state:commit(S5),
+    S6 = emqx_persistent_session_ds_state:set_clientinfo(ClientInfo, S5),
+    S = emqx_persistent_session_ds_state:commit(S6),
     #{
         id => Id,
         props => Conf,
@@ -1208,10 +1213,29 @@ seqno_diff(?QOS_2, A, B) ->
 %% Will message handling
 %%--------------------------------------------------------------------
 
--spec publish_will_message(session(), message()) -> ok.
-publish_will_message(_Session, #message{} = _WillMsg) ->
-    %% TODO
-    ok.
+-spec clear_will_message(session()) -> session().
+clear_will_message(#{s := S0} = Session) ->
+    S = emqx_persistent_session_ds_state:clear_will_message(S0),
+    Session#{s := S}.
+
+-spec publish_will_message_now(session(), message()) -> session().
+publish_will_message_now(#{} = Session, WillMsg = #message{}) ->
+    _ = emqx_broker:publish(WillMsg),
+    clear_will_message(Session).
+
+maybe_set_will_message_timer(#{id := SessionId, s := S}) ->
+    case emqx_persistent_session_ds_state:get_will_message(S) of
+        #message{} = WillMsg ->
+            WillDelayInterval = emqx_channel:will_delay_interval(WillMsg),
+            WillDelayInterval > 0 andalso
+                emqx_persistent_session_ds_gc_worker:check_session_after(
+                    SessionId,
+                    timer:seconds(WillDelayInterval)
+                ),
+            ok;
+        _ ->
+            ok
+    end.
 
 %%--------------------------------------------------------------------
 %% Tests

+ 1 - 0
apps/emqx/src/emqx_persistent_session_ds.hrl

@@ -76,5 +76,6 @@
 -define(last_id, last_id).
 -define(peername, peername).
 -define(will_message, will_message).
+-define(clientinfo, clientinfo).
 
 -endif.

+ 102 - 10
apps/emqx/src/emqx_persistent_session_ds_gc_worker.erl

@@ -25,7 +25,9 @@
 
 %% API
 -export([
-    start_link/0
+    start_link/0,
+    check_session/1,
+    check_session_after/2
 ]).
 
 %% `gen_server' API
@@ -38,6 +40,7 @@
 
 %% call/cast/info records
 -record(gc, {}).
+-record(check_session, {id :: emqx_persistent_session_ds:id()}).
 
 %%--------------------------------------------------------------------------------
 %% API
@@ -46,6 +49,17 @@
 start_link() ->
     gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
 
+-spec check_session(emqx_persistent_session_ds:id()) -> ok.
+check_session(SessionId) ->
+    gen_server:cast(?MODULE, #check_session{id = SessionId}).
+
+-spec check_session_after(emqx_persistent_session_ds:id(), pos_integer()) -> ok.
+check_session_after(SessionId, Time0) ->
+    #{bump_interval := BumpInterval} = gc_context(),
+    Time = max(Time0, BumpInterval),
+    _ = erlang:send_after(Time, ?MODULE, #check_session{id = SessionId}),
+    ok.
+
 %%--------------------------------------------------------------------------------
 %% `gen_server' API
 %%--------------------------------------------------------------------------------
@@ -58,6 +72,9 @@ init(_Opts) ->
 handle_call(_Call, _From, State) ->
     {reply, error, State}.
 
+handle_cast(#check_session{id = SessionId}, State) ->
+    do_check_session(SessionId),
+    {noreply, State};
 handle_cast(_Cast, State) ->
     {noreply, State}.
 
@@ -65,6 +82,9 @@ handle_info(#gc{}, State) ->
     try_gc(),
     ensure_gc_timer(),
     {noreply, State};
+handle_info(#check_session{id = SessionId}, State) ->
+    do_check_session(SessionId),
+    {noreply, State};
 handle_info(_Info, State) ->
     {noreply, State}.
 
@@ -104,25 +124,65 @@ now_ms() ->
     erlang:system_time(millisecond).
 
 start_gc() ->
+    #{
+        min_last_alive := MinLastAlive,
+        min_last_alive_will_msg := MinLastAliveWillMsg
+    } = gc_context(),
+    gc_loop(
+        MinLastAlive, MinLastAliveWillMsg, emqx_persistent_session_ds_state:make_session_iterator()
+    ).
+
+gc_context() ->
     GCInterval = emqx_config:get([session_persistence, session_gc_interval]),
     BumpInterval = emqx_config:get([session_persistence, last_alive_update_interval]),
     TimeThreshold = max(GCInterval, BumpInterval) * 3,
-    MinLastAlive = now_ms() - TimeThreshold,
-    gc_loop(MinLastAlive, emqx_persistent_session_ds_state:make_session_iterator()).
-
-gc_loop(MinLastAlive, It0) ->
+    NowMS = now_ms(),
+    #{
+        min_last_alive => NowMS - TimeThreshold,
+        %% For will messages, we don't need to be so strict as session GC (GC interval is
+        %% of the order of ~ 10 minutes by default, bump interval ~ 100 ms), otherwise
+        %% most will be sent very late.
+        min_last_alive_will_msg => NowMS - BumpInterval * 5,
+        time_threshold => TimeThreshold,
+        bump_interval => BumpInterval,
+        gc_interval => GCInterval
+    }.
+
+gc_loop(MinLastAlive, MinLastAliveWillMsg, It0) ->
     GCBatchSize = emqx_config:get([session_persistence, session_gc_batch_size]),
     case emqx_persistent_session_ds_state:session_iterator_next(It0, GCBatchSize) of
         {[], _It} ->
             ok;
         {Sessions, It} ->
-            [do_gc(SessionId, MinLastAlive, Metadata) || {SessionId, Metadata} <- Sessions],
-            gc_loop(MinLastAlive, It)
+            [
+                do_gc(SessionId, MinLastAliveWillMsg, MinLastAlive, Metadata)
+             || {SessionId, Metadata} <- Sessions
+            ],
+            gc_loop(MinLastAlive, MinLastAliveWillMsg, It)
     end.
 
-do_gc(SessionId, MinLastAlive, Metadata) ->
-    #{?last_alive_at := LastAliveAt, ?expiry_interval := EI} = Metadata,
-    case LastAliveAt + EI < MinLastAlive of
+do_gc(SessionId, MinLastAliveWillMsg, MinLastAlive, Metadata) ->
+    #{
+        ?last_alive_at := LastAliveAt,
+        ?expiry_interval := EI,
+        ?will_message := MaybeWillMessage,
+        ?clientinfo := ClientInfo
+    } = Metadata,
+    IsExpired = LastAliveAt + EI < MinLastAlive,
+    case
+        should_send_will_message(
+            MaybeWillMessage, ClientInfo, IsExpired, LastAliveAt, MinLastAliveWillMsg
+        )
+    of
+        {true, PreparedMessage} ->
+            _ = emqx_broker:publish(PreparedMessage),
+            ok = emqx_persistent_session_ds_state:clear_will_message_now(SessionId),
+            ?tp(session_gc_published_will_msg, #{id => SessionId, msg => PreparedMessage}),
+            ok;
+        false ->
+            ok
+    end,
+    case IsExpired of
         true ->
             emqx_persistent_session_ds:destroy_session(SessionId),
             ?tp(debug, ds_session_gc_cleaned, #{
@@ -134,3 +194,35 @@ do_gc(SessionId, MinLastAlive, Metadata) ->
         false ->
             ok
     end.
+
+should_send_will_message(
+    undefined = _WillMsg, _ClientInfo, _IsExpired, _LastAliveAt, _MinLastAliveWillMsg
+) ->
+    false;
+should_send_will_message(WillMsg, ClientInfo, IsExpired, LastAliveAt, MinLastAliveWillMsg) ->
+    WillDelayIntervalS = emqx_channel:will_delay_interval(WillMsg),
+    WillDelayInterval = timer:seconds(WillDelayIntervalS),
+    PastWillDelay = LastAliveAt + WillDelayInterval < MinLastAliveWillMsg,
+    case PastWillDelay orelse IsExpired of
+        true ->
+            case emqx_channel:prepare_will_message_for_publishing(ClientInfo, WillMsg) of
+                {ok, PreparedMessage} ->
+                    {true, PreparedMessage};
+                {error, _} ->
+                    false
+            end;
+        false ->
+            false
+    end.
+
+do_check_session(SessionId) ->
+    case emqx_persistent_session_ds_state:print_session(SessionId) of
+        #{metadata := Metadata} ->
+            #{
+                min_last_alive := MinLastAlive,
+                min_last_alive_will_msg := MinLastAliveWillMsg
+            } = gc_context(),
+            do_gc(SessionId, MinLastAliveWillMsg, MinLastAlive, Metadata);
+        _ ->
+            ok
+    end.

+ 27 - 1
apps/emqx/src/emqx_persistent_session_ds_state.erl

@@ -30,7 +30,8 @@
 -export([get_created_at/1, set_created_at/2]).
 -export([get_last_alive_at/1, set_last_alive_at/2]).
 -export([get_expiry_interval/1, set_expiry_interval/2]).
--export([get_will_message/1, set_will_message/2]).
+-export([get_clientinfo/1, set_clientinfo/2]).
+-export([get_will_message/1, set_will_message/2, clear_will_message/1, clear_will_message_now/1]).
 -export([get_peername/1, set_peername/2]).
 -export([new_id/1]).
 -export([get_stream/2, put_stream/3, del_stream/2, fold_streams/3]).
@@ -291,6 +292,14 @@ get_peername(Rec) ->
 set_peername(Val, Rec) ->
     set_meta(?peername, Val, Rec).
 
+-spec get_clientinfo(t()) -> emqx_maybe:t(emqx_types:clientinfo()).
+get_clientinfo(Rec) ->
+    get_meta(?clientinfo, Rec).
+
+-spec set_clientinfo(emqx_types:clientinfo(), t()) -> t().
+set_clientinfo(Val, Rec) ->
+    set_meta(?clientinfo, Val, Rec).
+
 -spec get_will_message(t()) -> emqx_maybe:t(message()).
 get_will_message(Rec) ->
     get_meta(?will_message, Rec).
@@ -299,6 +308,23 @@ get_will_message(Rec) ->
 set_will_message(Val, Rec) ->
     set_meta(?will_message, Val, Rec).
 
+-spec clear_will_message_now(emqx_persistent_session_ds:id()) -> ok.
+clear_will_message_now(SessionId) when is_binary(SessionId) ->
+    transaction(fun() ->
+        case kv_restore(?session_tab, SessionId) of
+            [Metadata0] ->
+                Metadata = Metadata0#{?will_message => undefined},
+                kv_persist(?session_tab, SessionId, Metadata),
+                ok;
+            [] ->
+                ok
+        end
+    end).
+
+-spec clear_will_message(t()) -> t().
+clear_will_message(Rec) ->
+    set_will_message(undefined, Rec).
+
 -spec new_id(t()) -> {emqx_persistent_session_ds:subscription_id(), t()}.
 new_id(Rec) ->
     LastId =

+ 11 - 5
apps/emqx/src/emqx_session.erl

@@ -90,7 +90,8 @@
 
 %% Will message handling
 -export([
-    publish_will_message/2
+    clear_will_message/1,
+    publish_will_message_now/2
 ]).
 
 % Timers
@@ -185,7 +186,8 @@
 -callback open(clientinfo(), conninfo(), emqx_maybe:t(message()), conf()) ->
     {_IsPresent :: true, t(), _ReplayContext} | false.
 -callback destroy(t() | clientinfo()) -> ok.
--callback publish_will_message(t(), message()) -> ok.
+-callback clear_will_message(t()) -> t().
+-callback publish_will_message_now(t(), message()) -> t().
 
 %%--------------------------------------------------------------------
 %% Create a Session
@@ -645,6 +647,10 @@ run_hook(Name, Args) ->
 %% Will message handling
 %%--------------------------------------------------------------------
 
--spec publish_will_message(t(), message()) -> ok.
-publish_will_message(Session, WillMsg) ->
-    ?IMPL(Session):publish_will_message(Session, WillMsg).
+-spec clear_will_message(t()) -> t().
+clear_will_message(Session) ->
+    ?IMPL(Session):clear_will_message(Session).
+
+-spec publish_will_message_now(t(), message()) -> t().
+publish_will_message_now(Session, WillMsg) ->
+    ?IMPL(Session):publish_will_message_now(Session, WillMsg).

+ 9 - 4
apps/emqx/src/emqx_session_mem.erl

@@ -108,7 +108,8 @@
 
 %% Will message handling
 -export([
-    publish_will_message/2
+    clear_will_message/1,
+    publish_will_message_now/2
 ]).
 
 %% Export for CT
@@ -808,10 +809,14 @@ next_pkt_id(Session = #session{next_pkt_id = Id}) ->
 %% Will message handling
 %%--------------------------------------------------------------------
 
--spec publish_will_message(session(), message()) -> ok.
-publish_will_message(#session{}, #message{} = WillMsg) ->
+-spec clear_will_message(session()) -> session().
+clear_will_message(#session{} = Session) ->
+    Session.
+
+-spec publish_will_message_now(session(), message()) -> session().
+publish_will_message_now(#session{} = Session, #message{} = WillMsg) ->
     _ = emqx_broker:publish(WillMsg),
-    ok.
+    Session.
 
 %%--------------------------------------------------------------------
 %% Helper functions

+ 4 - 0
apps/emqx/src/proto/emqx_cm_proto_v2.erl

@@ -20,6 +20,7 @@
 
 -export([
     introduced_in/0,
+    deprecated_since/0,
 
     lookup_client/2,
     kickout_client/2,
@@ -39,6 +40,9 @@
 introduced_in() ->
     "5.0.0".
 
+deprecated_since() ->
+    "5.7.0".
+
 -spec kickout_client(node(), emqx_types:clientid()) -> ok | {badrpc, _}.
 kickout_client(Node, ClientId) ->
     rpc:call(Node, emqx_cm, kick_session, [ClientId]).

+ 106 - 0
apps/emqx/src/proto/emqx_cm_proto_v3.erl

@@ -0,0 +1,106 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_cm_proto_v3).
+
+-behaviour(emqx_bpapi).
+
+-export([
+    introduced_in/0,
+
+    lookup_client/2,
+    kickout_client/2,
+
+    get_chan_stats/2,
+    get_chan_info/2,
+    get_chann_conn_mod/2,
+
+    takeover_session/2,
+    takeover_finish/2,
+    kick_session/3,
+
+    %% Introduced in v3
+    takeover_kick_session/2
+]).
+
+-include("bpapi.hrl").
+-include_lib("emqx/include/emqx_cm.hrl").
+
+introduced_in() ->
+    "5.7.0".
+
+-spec kickout_client(node(), emqx_types:clientid()) -> ok | {badrpc, _}.
+kickout_client(Node, ClientId) ->
+    rpc:call(Node, emqx_cm, kick_session, [ClientId]).
+
+-spec lookup_client(node(), {clientid, emqx_types:clientid()} | {username, emqx_types:username()}) ->
+    [emqx_cm:channel_info()] | {badrpc, _}.
+lookup_client(Node, Key) ->
+    rpc:call(Node, emqx_cm, lookup_client, [Key]).
+
+-spec get_chan_stats(emqx_types:clientid(), emqx_cm:chan_pid()) ->
+    emqx_types:stats() | undefined | {badrpc, _}.
+get_chan_stats(ClientId, ChanPid) ->
+    rpc:call(node(ChanPid), emqx_cm, do_get_chan_stats, [ClientId, ChanPid], ?T_GET_INFO * 2).
+
+-spec get_chan_info(emqx_types:clientid(), emqx_cm:chan_pid()) ->
+    emqx_types:infos() | undefined | {badrpc, _}.
+get_chan_info(ClientId, ChanPid) ->
+    rpc:call(node(ChanPid), emqx_cm, do_get_chan_info, [ClientId, ChanPid], ?T_GET_INFO * 2).
+
+-spec get_chann_conn_mod(emqx_types:clientid(), emqx_cm:chan_pid()) ->
+    module() | undefined | {badrpc, _}.
+get_chann_conn_mod(ClientId, ChanPid) ->
+    rpc:call(node(ChanPid), emqx_cm, do_get_chann_conn_mod, [ClientId, ChanPid], ?T_GET_INFO * 2).
+
+-spec takeover_session(emqx_types:clientid(), emqx_cm:chan_pid()) ->
+    none
+    | {living, _ConnMod :: atom(), emqx_cm:chan_pid(), emqx_session:session()}
+    %% NOTE: v5.3.0
+    | {living, _ConnMod :: atom(), emqx_session:session()}
+    | {expired | persistent, emqx_session:session()}
+    | {badrpc, _}.
+takeover_session(ClientId, ChanPid) ->
+    rpc:call(node(ChanPid), emqx_cm, takeover_session, [ClientId, ChanPid], ?T_TAKEOVER * 2).
+
+-spec takeover_finish(module(), emqx_cm:chan_pid()) ->
+    {ok, emqx_types:takeover_data()}
+    | {ok, list(emqx_types:deliver())}
+    | {error, term()}
+    | {badrpc, _}.
+takeover_finish(ConnMod, ChanPid) ->
+    erpc:call(
+        node(ChanPid),
+        emqx_cm,
+        takeover_finish,
+        [ConnMod, ChanPid],
+        ?T_TAKEOVER * 2
+    ).
+
+-spec kick_session(kick | discard, emqx_types:clientid(), emqx_cm:chan_pid()) -> ok | {badrpc, _}.
+kick_session(Action, ClientId, ChanPid) ->
+    rpc:call(node(ChanPid), emqx_cm, do_kick_session, [Action, ClientId, ChanPid], ?T_KICK * 2).
+
+%%--------------------------------------------------------------------------------
+%% Introduced in v3
+%%--------------------------------------------------------------------------------
+
+-spec takeover_kick_session(emqx_types:clientid(), emqx_cm:chan_pid()) ->
+    ok | {badrpc, _}.
+takeover_kick_session(ClientId, ChanPid) ->
+    rpc:call(
+        node(ChanPid), emqx_cm, do_takeover_kick_session_v3, [ClientId, ChanPid], ?T_TAKEOVER * 2
+    ).

+ 1 - 0
apps/emqx/test/emqx_cth_suite.erl

@@ -150,6 +150,7 @@ when
         work_dir := file:name()
     }.
 start(Apps, SuiteOpts = #{work_dir := WorkDir}) ->
+    emqx_common_test_helpers:clear_screen(),
     % 1. Prepare appspec instructions
     AppSpecs = [mk_appspec(App, SuiteOpts) || App <- Apps],
     % 2. Load every app so that stuff scanning attributes of loaded modules works

+ 62 - 16
apps/emqx/test/emqx_persistent_session_SUITE.erl

@@ -77,6 +77,7 @@ init_per_group(persistence_enabled, Config) ->
             "  enable = true\n"
             "  last_alive_update_interval = 100ms\n"
             "  renew_streams_interval = 100ms\n"
+            "  session_gc_interval = 2s\n"
             "}"},
         {persistence, ds}
         | Config
@@ -1241,15 +1242,70 @@ t_multiple_subscription_matches(Config) ->
     ?assertEqual({ok, 2}, maps:find(qos, Msg2)),
     ok = emqtt:disconnect(Client2).
 
+%% Check that we don't get a will message when the client disconnects with success reason
+%% code, with `Will-Delay-Interval' = 0, `Session-Expiry-Interval' > 0, QoS = 1.
+t_no_will_message(Config) ->
+    ConnFun = ?config(conn_fun, Config),
+    WillTopic = ?config(topic, Config),
+    WillPayload = <<"will message">>,
+    ClientId = ?config(client_id, Config),
+
+    ?check_trace(
+        #{timetrap => 15_000},
+        begin
+            ok = emqx:subscribe(WillTopic, #{qos => 2}),
+            {ok, Client} = emqtt:start_link([
+                {clientid, ClientId},
+                {proto_ver, v5},
+                {properties, #{'Session-Expiry-Interval' => 1}},
+                {will_topic, WillTopic},
+                {will_payload, WillPayload},
+                {will_qos, 1},
+                {will_props, #{'Will-Delay-Interval' => 0}}
+                | Config
+            ]),
+            {ok, _} = emqtt:ConnFun(Client),
+            ok = emqtt:disconnect(Client, ?RC_SUCCESS),
+
+            %% No will message
+            ?assertNotReceive({deliver, WillTopic, _}, 5_000),
+
+            ok
+        end,
+        []
+    ),
+    ok.
+
 %% Check that we get a single will message when the client disconnects with a non
 %% successfull reason code, with `Will-Delay-Interval' = `Session-Expiry-Interval' > 0,
 %% QoS = 1.
 t_will_message1(Config) ->
+    do_t_will_message(Config, #{will_delay => 1, session_expiry => 1}),
+    ok.
+
+%% Check that we get a single will message when the client disconnects with a non
+%% successfull reason code, with `Will-Delay-Interval' = 0, `Session-Expiry-Interval' > 0,
+%% QoS = 1.
+t_will_message2(Config) ->
+    do_t_will_message(Config, #{will_delay => 0, session_expiry => 1}),
+    ok.
+
+%% Check that we get a single will message when the client disconnects with a non
+%% successfull reason code, with `Will-Delay-Interval' >> `Session-Expiry-Interval' > 0,
+%% QoS = 1.
+t_will_message3(Config) ->
+    do_t_will_message(Config, #{will_delay => 300, session_expiry => 1}),
+    ok.
+
+do_t_will_message(Config, Opts) ->
+    #{
+        session_expiry := SessionExpiry,
+        will_delay := WillDelay
+    } = Opts,
     ConnFun = ?config(conn_fun, Config),
     WillTopic = ?config(topic, Config),
     WillPayload = <<"will message">>,
     ClientId = ?config(client_id, Config),
-    ok = emqx_hooks:add('client.connack', {?MODULE, on_client_connack, [self()]}, _Prio = 1000),
 
     ?check_trace(
         #{timetrap => 15_000},
@@ -1258,31 +1314,25 @@ t_will_message1(Config) ->
             {ok, Client} = emqtt:start_link([
                 {clientid, ClientId},
                 {proto_ver, v5},
-                {properties, #{'Session-Expiry-Interval' => 1}},
+                {properties, #{'Session-Expiry-Interval' => SessionExpiry}},
                 {will_topic, WillTopic},
                 {will_payload, WillPayload},
                 {will_qos, 1},
-                {will_props, #{'Will-Delay-Interval' => 1}}
+                {will_props, #{'Will-Delay-Interval' => WillDelay}}
                 | Config
             ]),
-            {{ok, _}, {ok, _}} =
-                ?wait_async_action(emqtt:ConnFun(Client), #{?snk_kind := client_connack}),
+            {ok, _} = emqtt:ConnFun(Client),
             ok = emqtt:disconnect(Client, ?RC_UNSPECIFIED_ERROR),
 
-            ?assertReceive({deliver, WillTopic, #message{payload = WillPayload}}, 5_000),
+            ?assertReceive({deliver, WillTopic, #message{payload = WillPayload}}, 10_000),
             %% No duplicates
-            ?assertNotReceive({deliver, WillTopic, _}, 1_000),
+            ?assertNotReceive({deliver, WillTopic, _}, 100),
 
             ok
         end,
         []
     ),
     ok.
-t_will_message1(init, Config) ->
-    Config;
-t_will_message1('end', _Config) ->
-    ok = emqx_hooks:del('client.connack', {?MODULE, on_client_connack}),
-    ok.
 
 get_topicwise_order(Msgs) ->
     maps:groups_from_list(fun get_msgpub_topic/1, fun get_msgpub_payload/1, Msgs).
@@ -1311,7 +1361,3 @@ pick_respective_msgs(MsgRefs, Msgs) ->
 debug_info(ClientId) ->
     Info = emqx_persistent_session_ds:print_session(ClientId),
     ct:pal("*** State:~n~p", [Info]).
-
-on_client_connack(_ConnInfo, _ReasonCode, _Props, _TestPid) ->
-    ?tp(client_connack, #{}),
-    ok.

+ 220 - 123
apps/emqx/test/emqx_takeover_SUITE.erl

@@ -23,6 +23,7 @@
 -include_lib("emqx/include/asserts.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
+-include_lib("emqx_utils/include/emqx_message.hrl").
 
 -define(CNT, 100).
 -define(SLEEP, 10).
@@ -32,25 +33,15 @@
 
 all() ->
     [
-        {group, mqttv3},
-        {group, mqttv5}
+        {group, persistence_disabled},
+        {group, persistence_enabled}
     ].
 
-init_per_suite(Config) ->
-    Apps = emqx_cth_suite:start(
-        [emqx],
-        #{work_dir => emqx_cth_suite:work_dir(Config)}
-    ),
-    emqx_logger:set_log_level(debug),
-    [{apps, Apps} | Config].
-
-end_per_suite(Config) ->
-    Apps = ?config(apps, Config),
-    ok = emqx_cth_suite:stop(Apps),
-    ok.
-
 groups() ->
+    MQTTGroups = [{group, G} || G <- [mqttv3, mqttv5]],
     [
+        {persistence_enabled, MQTTGroups},
+        {persistence_disabled, MQTTGroups},
         {mqttv3, [], emqx_common_test_helpers:all(?MODULE) -- tc_v5_only()},
         {mqttv5, [], emqx_common_test_helpers:all(?MODULE)}
     ].
@@ -67,11 +58,55 @@ tc_v5_only() ->
         t_takeover_session_then_abnormal_disconnect_2
     ].
 
+init_per_suite(Config) ->
+    emqx_common_test_helpers:clear_screen(),
+    Config.
+
+end_per_suite(_Config) ->
+    ok.
+
+init_per_group(persistence_enabled = Group, Config) ->
+    Apps = emqx_cth_suite:start(
+        [
+            {emqx,
+                "session_persistence = {\n"
+                "  enable = true\n"
+                "  last_alive_update_interval = 100ms\n"
+                "  renew_streams_interval = 100ms\n"
+                "  session_gc_interval = 2s\n"
+                "}\n"}
+        ],
+        #{work_dir => emqx_cth_suite:work_dir(Group, Config)}
+    ),
+    emqx_logger:set_log_level(debug),
+    [
+        {apps, Apps},
+        {persistence_enabled, true}
+        | Config
+    ];
+init_per_group(persistence_disabled = Group, Config) ->
+    Apps = emqx_cth_suite:start(
+        [{emqx, "session_persistence.enable = false"}],
+        #{work_dir => emqx_cth_suite:work_dir(Group, Config)}
+    ),
+    emqx_logger:set_log_level(debug),
+    [
+        {apps, Apps},
+        {persistence_enabled, false}
+        | Config
+    ];
 init_per_group(mqttv3, Config) ->
     lists:keystore(mqtt_vsn, 1, Config, {mqtt_vsn, v3});
 init_per_group(mqttv5, Config) ->
     lists:keystore(mqtt_vsn, 1, Config, {mqtt_vsn, v5}).
 
+end_per_group(Group, Config) when
+    Group =:= persistence_disabled;
+    Group =:= persistence_enabled
+->
+    Apps = ?config(apps, Config),
+    ok = emqx_cth_suite:stop(Apps),
+    ok;
 end_per_group(_Group, _Config) ->
     ok.
 
@@ -97,19 +132,34 @@ t_takeover(Config) ->
         ok = timer:sleep(?SLEEP * 2),
         meck:passthrough([Arg])
     end),
+    meck:expect(emqx_cm, takeover_kick, fun(Arg) ->
+        %% trigger more complex takeover conditions during 2-phase takeover protocol:
+        %% when messages are accumulated in 2 processes simultaneously,
+        %% and need to be properly ordered / deduplicated after the protocol commences.
+        ok = timer:sleep(?SLEEP * 2),
+        meck:passthrough([Arg])
+    end),
     Commands =
-        [{fun start_client/5, [ClientId, ClientId, ?QOS_1, ClientOpts]}] ++
-            [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++
-            [{fun start_client/5, [ClientId, ClientId, ?QOS_1, ClientOpts]}] ++
-            [{fun publish_msg/2, [Msg]} || Msg <- Client2Msgs] ++
-            [{fun stop_client/1, []}],
+        lists:flatten([
+            {fun start_client/5, [ClientId, ClientId, ?QOS_1, ClientOpts]},
+            {fun maybe_wait_subscriptions/1, []},
+            [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs],
+            {fun start_client/5, [ClientId, ClientId, ?QOS_1, ClientOpts]},
+            [{fun publish_msg/2, [Msg]} || Msg <- Client2Msgs],
+            {fun stop_client/1, []}
+        ]),
 
+    Sleep =
+        case ?config(persistence_enabled, Config) of
+            true -> 1_500;
+            false -> ?SLEEP
+        end,
     FCtx = lists:foldl(
         fun({Fun, Args}, Ctx) ->
             ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]),
             apply(Fun, [Ctx | Args])
         end,
-        #{},
+        #{persistence_enabled => ?config(persistence_enabled, Config), sleep => Sleep},
         Commands
     ),
 
@@ -117,7 +167,7 @@ t_takeover(Config) ->
 
     assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover),
     ?assertReceive({'EXIT', CPid2, normal}),
-    Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)],
+    Received = [Msg || {publish, Msg} <- ?drainMailbox(Sleep)],
     ct:pal("middle: ~p", [Middle]),
     ct:pal("received: ~p", [[P || #{payload := P} <- Received]]),
     assert_messages_missed(AllMsgs, Received),
@@ -141,30 +191,36 @@ t_takeover_willmsg(Config) ->
         {will_qos, 0}
     ],
     Commands =
-        %% GIVEN client connect with will message
-        [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++
-            [
-                {fun start_client/5, [
-                    <<ClientId/binary, <<"_willsub">>/binary>>, WillTopic, ?QOS_1, []
-                ]}
-            ] ++
-            [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++
+        lists:flatten([
+            %% GIVEN client connect with will message
+            {fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]},
+            {fun maybe_wait_subscriptions/1, []},
+            {fun start_client/5, [
+                <<ClientId/binary, <<"_willsub">>/binary>>, WillTopic, ?QOS_1, []
+            ]},
+            [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs],
             %% WHEN client reconnect with clean_start = false
-            [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++
-            [{fun publish_msg/2, [Msg]} || Msg <- Client2Msgs],
+            {fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]},
+            [{fun publish_msg/2, [Msg]} || Msg <- Client2Msgs]
+        ]),
 
     FCtx = lists:foldl(
         fun({Fun, Args}, Ctx) ->
             ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]),
             apply(Fun, [Ctx | Args])
         end,
-        #{},
+        #{persistence_enabled => ?config(persistence_enabled, Config)},
         Commands
     ),
 
     #{client := [CPid2, CPidSub, CPid1]} = FCtx,
     assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover),
-    Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)],
+    Sleep =
+        case ?config(persistence_enabled, Config) of
+            true -> 2_000;
+            false -> ?SLEEP
+        end,
+    Received = [Msg || {publish, Msg} <- ?drainMailbox(Sleep)],
     ct:pal("received: ~p", [[P || #{payload := P} <- Received]]),
     {IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload">>),
     assert_messages_missed(AllMsgs, ReceivedNoWill),
@@ -297,7 +353,7 @@ t_no_takeover_with_delayed_willmsg(Config) ->
     ?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"),
     process_flag(trap_exit, true),
     ClientId = atom_to_binary(?FUNCTION_NAME),
-    WillTopic = <<ClientId/binary, <<"willtopic">>/binary>>,
+    WillTopic = <<ClientId/binary, "willtopic">>,
     Client1Msgs = messages(ClientId, 0, 10),
     WillOpts = [
         {proto_ver, ?config(mqtt_vsn, Config)},
@@ -312,24 +368,25 @@ t_no_takeover_with_delayed_willmsg(Config) ->
     ],
     Commands =
         %% GIVEN: client connect with willmsg payload <<"willpayload_delay3">> and delay-interval 3s
-        [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++
+        lists:flatten(
             [
-                {fun start_client/5, [
-                    <<ClientId/binary, <<"_willsub">>/binary>>, WillTopic, ?QOS_1, []
-                ]}
-            ] ++
-            [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs],
+                {fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]},
+                {fun maybe_wait_subscriptions/1, []},
+                {fun start_client/5, [<<ClientId/binary, "_willsub">>, WillTopic, ?QOS_1, []]},
+                [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs]
+            ]
+        ),
 
     FCtx = lists:foldl(
         fun({Fun, Args}, Ctx) ->
             ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]),
             apply(Fun, [Ctx | Args])
         end,
-        #{},
+        #{persistence_enabled => ?config(persistence_enabled, Config)},
         Commands
     ),
 
-    Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)],
+    Received = [Msg || {publish, Msg} <- ?drainMailbox(2000)],
     ct:pal("received: ~p", [[P || #{payload := P} <- Received]]),
     assert_messages_missed(Client1Msgs, Received),
     {IsWill0, ReceivedNoWill0} = filter_payload(Received, <<"willpayload_delay3">>),
@@ -369,15 +426,15 @@ t_session_expire_with_delayed_willmsg(Config) ->
         {properties, #{'Session-Expiry-Interval' => 3}}
     ],
     Commands =
-        %% GIVEN: client connect with willmsg payload <<"willpayload_delay10">>
-        %%        and delay-interval 10s > session expiry 3s.
-        [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++
-            [
-                {fun start_client/5, [
-                    <<ClientId/binary, <<"_willsub">>/binary>>, WillTopic, ?QOS_1, []
-                ]}
-            ] ++
-            [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs],
+        lists:flatten([
+            %% GIVEN: client connect with willmsg payload <<"willpayload_delay10">>
+            %%        and delay-interval 10s > session expiry 3s.
+            {fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]},
+            {fun start_client/5, [
+                <<ClientId/binary, <<"_willsub">>/binary>>, WillTopic, ?QOS_1, []
+            ]},
+            [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs]
+        ]),
 
     FCtx = lists:foldl(
         fun({Fun, Args}, Ctx) ->
@@ -388,7 +445,7 @@ t_session_expire_with_delayed_willmsg(Config) ->
         Commands
     ),
 
-    Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)],
+    Received = [Msg || {publish, Msg} <- ?drainMailbox(2000)],
     ct:pal("received: ~p", [[P || #{payload := P} <- Received]]),
     {IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay10">>),
     ?assertNot(IsWill),
@@ -404,7 +461,15 @@ t_session_expire_with_delayed_willmsg(Config) ->
     ?assertNot(IsWill1),
     ?assertEqual([], ReceivedNoWill1),
     %% THEN: for MQTT v5, payload <<"willpayload_delay3">> should be published after session expiry.
-    Received2 = [Msg || {publish, Msg} <- ?drainMailbox(5000)],
+    SessionSleep =
+        case ?config(persistence_enabled, Config) of
+            true ->
+                %% Session GC uses a larger, safer cutoff time.
+                10_000;
+            false ->
+                5_000
+        end,
+    Received2 = [Msg || {publish, Msg} <- ?drainMailbox(SessionSleep)],
     {IsWill12, ReceivedNoWill2} = filter_payload(Received2, <<"willpayload_delay10">>),
     ?assertEqual([], ReceivedNoWill2),
     ?assert(IsWill12),
@@ -493,25 +558,23 @@ t_takeover_before_session_expire(Config) ->
     Commands =
         %% GIVEN: client connect with willmsg payload <<"willpayload_delay10">>
         %%        and delay-interval 10s > session expiry 3s.
-        [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++
-            [
-                {fun start_client/5, [
-                    <<ClientId/binary, <<"_willsub">>/binary>>, WillTopic, ?QOS_1, []
-                ]}
-            ] ++
-            [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++
-            [
-                %% avoid two clients race for session takeover
-                {
-                    fun(CTX) ->
-                        timer:sleep(100),
-                        CTX
-                    end,
-                    []
-                }
-            ] ++
+        lists:flatten([
+            {fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]},
+            {fun start_client/5, [
+                <<ClientId/binary, <<"_willsub">>/binary>>, WillTopic, ?QOS_1, []
+            ]},
+            [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs],
+            %% avoid two clients race for session takeover
+            {
+                fun(CTX) ->
+                    timer:sleep(100),
+                    CTX
+                end,
+                []
+            },
             %% WHEN: client session is taken over within 3s.
-            [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}],
+            {fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}
+        ]),
 
     FCtx = lists:foldl(
         fun({Fun, Args}, Ctx) ->
@@ -540,7 +603,7 @@ t_takeover_session_then_normal_disconnect(Config) ->
     ?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"),
     process_flag(trap_exit, true),
     ClientId = atom_to_binary(?FUNCTION_NAME),
-    WillTopic = <<ClientId/binary, <<"willtopic">>/binary>>,
+    WillTopic = <<ClientId/binary, "willtopic">>,
     Client1Msgs = messages(ClientId, 0, 10),
     WillOpts = [
         {proto_ver, ?config(mqtt_vsn, Config)},
@@ -552,40 +615,42 @@ t_takeover_session_then_normal_disconnect(Config) ->
         {properties, #{'Session-Expiry-Interval' => 3}}
     ],
     Commands =
-        [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++
-            [
-                {fun start_client/5, [
-                    <<ClientId/binary, <<"_willsub">>/binary>>, WillTopic, ?QOS_1, []
-                ]}
-            ] ++
-            [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++
-            [
-                %% avoid two clients race for session takeover
-                {
-                    fun(CTX) ->
-                        timer:sleep(100),
-                        CTX
-                    end,
-                    []
-                }
-            ] ++
+        lists:flatten([
+            {fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]},
+            {fun maybe_wait_subscriptions/1, []},
+            {fun start_client/5, [
+                <<ClientId/binary, "_willsub">>, WillTopic, ?QOS_1, []
+            ]},
+            [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs],
+            %% avoid two clients race for session takeover
+            {
+                fun(CTX) ->
+                    timer:sleep(100),
+                    CTX
+                end,
+                []
+            },
             %% GIVEN: client reconnect with willmsg payload <<"willpayload_delay10">>
             %%        and delay-interval 10s > session expiry 3s.
-            [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}],
+            {fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]},
+            {fun maybe_wait_subscriptions/1, []}
+        ]),
 
     FCtx = lists:foldl(
         fun({Fun, Args}, Ctx) ->
             ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]),
             apply(Fun, [Ctx | Args])
         end,
-        #{},
+        #{persistence_enabled => ?config(persistence_enabled, Config)},
         Commands
     ),
     #{client := [CPid2, CPidSub, CPid1]} = FCtx,
     assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover),
+    Received1 = [Msg || {publish, Msg} <- ?drainMailbox(1000)],
     %% WHEN: client disconnect normally.
     emqtt:disconnect(CPid2, ?RC_SUCCESS),
-    Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)],
+    Received2 = [Msg || {publish, Msg} <- ?drainMailbox(1000)],
+    Received = Received1 ++ Received2,
     ct:pal("received: ~p", [[P || #{payload := P} <- Received]]),
     {IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay10">>),
     %% THEN: willmsg is not published.
@@ -600,7 +665,7 @@ t_takeover_session_then_abnormal_disconnect(Config) ->
     ?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"),
     process_flag(trap_exit, true),
     ClientId = atom_to_binary(?FUNCTION_NAME),
-    WillTopic = <<ClientId/binary, <<"willtopic">>/binary>>,
+    WillTopic = <<ClientId/binary, "willtopic">>,
     Client1Msgs = messages(ClientId, 0, 10),
     WillOpts = [
         {proto_ver, ?config(mqtt_vsn, Config)},
@@ -612,26 +677,24 @@ t_takeover_session_then_abnormal_disconnect(Config) ->
         {properties, #{'Session-Expiry-Interval' => 3}}
     ],
     Commands =
-        %% GIVEN: client connect with willmsg payload <<"willpayload_delay10">>
-        %%        and will-delay-interval 10s >  session expiry 3s.
-        [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++
-            [
-                {fun start_client/5, [
-                    <<ClientId/binary, <<"_willsub">>/binary>>, WillTopic, ?QOS_1, []
-                ]}
-            ] ++
-            [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++
-            [
-                %% avoid two clients race for session takeover
-                {
-                    fun(CTX) ->
-                        timer:sleep(100),
-                        CTX
-                    end,
-                    []
-                }
-            ] ++
-            [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}],
+        lists:flatten([
+            %% GIVEN: client connect with willmsg payload <<"willpayload_delay10">>
+            %%        and will-delay-interval 10s >  session expiry 3s.
+            {fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]},
+            {fun start_client/5, [
+                <<ClientId/binary, "_willsub">>, WillTopic, ?QOS_1, []
+            ]},
+            %% avoid two clients race for session takeover
+            {
+                fun(CTX) ->
+                    timer:sleep(100),
+                    CTX
+                end,
+                []
+            },
+            [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs],
+            {fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}
+        ]),
 
     FCtx = lists:foldl(
         fun({Fun, Args}, Ctx) ->
@@ -643,16 +706,26 @@ t_takeover_session_then_abnormal_disconnect(Config) ->
     ),
     #{client := [CPid2, CPidSub, CPid1]} = FCtx,
     assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover),
+    Received1 = [Msg || {publish, Msg} <- ?drainMailbox(1000)],
     %% WHEN: client disconnect abnormally
     emqtt:disconnect(CPid2, ?RC_DISCONNECT_WITH_WILL_MESSAGE),
-    Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)],
+    Received2 = [Msg || {publish, Msg} <- ?drainMailbox(1000)],
+    Received = Received1 ++ Received2,
     ct:pal("received: ~p", [[P || #{payload := P} <- Received]]),
     {IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay10">>),
     %% THEN: willmsg is not published before session expiry
     ?assertNot(IsWill),
     ?assertNotEqual([], ReceivedNoWill),
-    Received1 = [Msg || {publish, Msg} <- ?drainMailbox(3000)],
-    {IsWill1, ReceivedNoWill1} = filter_payload(Received1, <<"willpayload_delay10">>),
+    SessionSleep =
+        case ?config(persistence_enabled, Config) of
+            true ->
+                %% Session GC uses a larger, safer cutoff time (GC interval x 3)
+                10_000;
+            false ->
+                3_000
+        end,
+    Received3 = [Msg || {publish, Msg} <- ?drainMailbox(SessionSleep)],
+    {IsWill1, ReceivedNoWill1} = filter_payload(Received3, <<"willpayload_delay10">>),
     %% AND THEN: willmsg is published after session expiry
     ?assert(IsWill1),
     ?assertEqual([], ReceivedNoWill1),
@@ -866,19 +939,39 @@ start_client(Ctx, ClientId, Topic, Qos, Opts) ->
     end),
     Ctx#{client => [CPid | maps:get(client, Ctx, [])]}.
 
+maybe_wait_subscriptions(Ctx = #{persistence_enabled := true, client := CPids}) ->
+    ok = do_wait_subscription(CPids),
+    Ctx;
+maybe_wait_subscriptions(Ctx) ->
+    Ctx.
+
+do_wait_subscription([]) ->
+    ok;
+do_wait_subscription([CPid | Rest]) ->
+    try emqtt:subscriptions(CPid) of
+        [] ->
+            ok = timer:sleep(rand:uniform(?SLEEP)),
+            do_wait_subscription([CPid | Rest]);
+        [_ | _] ->
+            do_wait_subscription(Rest)
+    catch
+        exit:{noproc, _} ->
+            ok
+    end.
+
 kick_client(Ctx, ClientId) ->
     ok = emqx_cm:kick_session(ClientId),
     Ctx.
 
 publish_msg(Ctx, Msg) ->
     ok = timer:sleep(rand:uniform(?SLEEP)),
-    case emqx:publish(Msg) of
+    case emqx:publish(Msg#message{timestamp = emqx_message:timestamp_now()}) of
         [] -> publish_msg(Ctx, Msg);
         [_ | _] -> Ctx
     end.
 
-stop_client(Ctx = #{client := [CPid | _]}) ->
-    ok = timer:sleep(?SLEEP),
+stop_client(Ctx = #{client := [CPid | _], sleep := Sleep}) ->
+    ok = timer:sleep(Sleep),
     ok = emqtt:stop(CPid),
     Ctx.
 
@@ -904,14 +997,18 @@ assert_messages_missed(Ls1, Ls2) ->
             error
     end.
 
-assert_messages_order([], []) ->
+assert_messages_order([] = _Expected, _Received) ->
     ok;
 assert_messages_order([Msg | Expected], Received) ->
     %% Account for duplicate messages:
     case lists:splitwith(fun(#{payload := P}) -> emqx_message:payload(Msg) == P end, Received) of
-        {[], [#{payload := Mismatch} | _]} ->
+        {[], [#{timestamp := TSMismatch, payload := Mismatch} | _]} ->
             ct:fail("Message order is not correct, expected: ~p, received: ~p", [
-                emqx_message:payload(Msg), Mismatch
+                {
+                    emqx_utils_calendar:epoch_to_rfc3339(emqx_message:timestamp(Msg)),
+                    emqx_message:payload(Msg)
+                },
+                {emqx_utils_calendar:epoch_to_rfc3339(TSMismatch), Mismatch}
             ]),
             error;
         {_Matching, Rest} ->