Przeglądaj źródła

Merge pull request #5875 from emqx/extend-persistent-sessions

Extend persistent sessions
Tobias Lindahl 4 lat temu
rodzic
commit
48c7788d6f

+ 14 - 2
Makefile

@@ -15,6 +15,8 @@ REL_PROFILES := emqx emqx-edge
 PKG_PROFILES := emqx-pkg emqx-edge-pkg
 PROFILES := $(REL_PROFILES) $(PKG_PROFILES) default
 
+CT_NODE_NAME ?= 'test@127.0.0.1'
+
 export REBAR_GIT_CLONE_OPTIONS += --depth=1
 
 .PHONY: default
@@ -44,7 +46,7 @@ proper: $(REBAR)
 
 .PHONY: ct
 ct: $(REBAR) conf-segs
-	@ENABLE_COVER_COMPILE=1 $(REBAR) ct --name 'test@127.0.0.1' -c -v
+	@ENABLE_COVER_COMPILE=1 $(REBAR) ct --name $(CT_NODE_NAME) -c -v
 
 APPS=$(shell $(CURDIR)/scripts/find-apps.sh)
 
@@ -52,7 +54,7 @@ APPS=$(shell $(CURDIR)/scripts/find-apps.sh)
 .PHONY: $(APPS:%=%-ct)
 define gen-app-ct-target
 $1-ct:
-	$(REBAR) ct --name 'test@127.0.0.1' -v --suite $(shell $(CURDIR)/scripts/find-suites.sh $1)
+	$(REBAR) ct --name $(CT_NODE_NAME) -v --suite $(shell $(CURDIR)/scripts/find-suites.sh $1)
 endef
 $(foreach app,$(APPS),$(eval $(call gen-app-ct-target,$(app))))
 
@@ -64,6 +66,16 @@ $1-prop:
 endef
 $(foreach app,$(APPS),$(eval $(call gen-app-prop-target,$(app))))
 
+.PHONY: ct-suite
+ct-suite: $(REBAR)
+ifneq ($(TESTCASE),)
+	$(REBAR) ct -v --readable=false --name $(CT_NODE_NAME) --suite $(SUITE)  --case $(TESTCASE)
+else ifneq ($(GROUP),)
+	$(REBAR) ct -v --readable=false --name $(CT_NODE_NAME) --suite $(SUITE)  --group $(GROUP)
+else
+	$(REBAR) ct -v --readable=false --name $(CT_NODE_NAME) --suite $(SUITE)
+endif
+
 .PHONY: cover
 cover: $(REBAR)
 	@ENABLE_COVER_COMPILE=1 $(REBAR) cover

+ 30 - 0
apps/emqx/etc/emqx.conf

@@ -1638,3 +1638,33 @@ example_common_websocket_options {
     client_max_window_bits = 15
   }
 }
+
+persistent_session_store {
+    ## Enable/disable internal persistent session store.
+    ##
+    ## @doc persistent_session_store.enabled
+    ## ValueType: Boolean
+    ## Default: false
+    enabled = false
+
+    ## How long are undelivered messages retained in the store
+    ##
+    ## @doc persistent_session_store.max_retain_undelivered
+    ## ValueType: Duration
+    ## Default: 1h
+    max_retain_undelivered = 1h
+
+    ## The time interval in which to try to run garbage collection of persistent session messages
+    ##
+    ## @doc persistent_session_store.message_gc_interval
+    ## ValueType: Duration
+    ## Default: 1h
+    message_gc_interval = 1h
+
+    ## The time interval in which to try to run garbage collection of persistent session transient data
+    ##
+    ## @doc persistent_session_store.session_message_gc_interval
+    ## ValueType: Duration
+    ## Default: 1m
+    session_message_gc_interval = 1m
+}

+ 3 - 1
apps/emqx/include/emqx.hrl

@@ -23,10 +23,12 @@
 -define(SHARED_SUB_SHARD, emqx_shared_sub_shard).
 -define(CM_SHARD, emqx_cm_shard).
 -define(ROUTE_SHARD, route_shard).
+-define(PERSISTENT_SESSION_SHARD, emqx_persistent_session_shard).
 
 -define(BOOT_SHARDS, [ ?ROUTE_SHARD
                      , ?COMMON_SHARD
                      , ?SHARED_SUB_SHARD
+                     , ?PERSISTENT_SESSION_SHARD
                      ]).
 
 %% Banner
@@ -87,7 +89,7 @@
 
 -record(route, {
           topic :: binary(),
-          dest  :: node() | {binary(), node()}
+          dest  :: node() | {binary(), node()} | emqx_session:sessionID()
          }).
 
 %%--------------------------------------------------------------------

+ 1 - 0
apps/emqx/src/emqx_app.erl

@@ -41,6 +41,7 @@
 
 start(_Type, _Args) ->
     ok = maybe_load_config(),
+    ok = emqx_persistent_session:init_db_backend(),
     ok = maybe_start_quicer(),
     wait_boot_shards(),
     {ok, Sup} = emqx_sup:start_link(),

+ 1 - 0
apps/emqx/src/emqx_broker.erl

@@ -206,6 +206,7 @@ publish(Msg) when is_record(Msg, message) ->
                            payload => emqx_message:to_log_map(Msg)}),
             [];
         Msg1 = #message{topic = Topic} ->
+            emqx_persistent_session:persist_message(Msg1),
             route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1))
     end.
 

+ 93 - 49
apps/emqx/src/emqx_channel.erl

@@ -33,8 +33,6 @@
         , get_mqtt_conf/2
         , get_mqtt_conf/3
         , set_conn_state/2
-        , get_session/1
-        , set_session/2
         , stats/1
         , caps/1
         ]).
@@ -180,11 +178,10 @@ info(timers, #channel{timers = Timers}) -> Timers.
 set_conn_state(ConnState, Channel) ->
     Channel#channel{conn_state = ConnState}.
 
-get_session(#channel{session = Session}) ->
-    Session.
-
-set_session(Session, Channel) ->
-    Channel#channel{session = Session}.
+set_session(Session, Channel = #channel{conninfo = ConnInfo, clientinfo = ClientInfo}) ->
+    %% Assume that this is also an updated session. Allow side effect.
+    Session1 = emqx_persistent_session:persist(ClientInfo, ConnInfo, Session),
+    Channel#channel{session = Session1}.
 
 %% TODO: Add more stats.
 -spec(stats(channel()) -> emqx_types:stats()).
@@ -369,10 +366,10 @@ handle_in(?PUBACK_PACKET(PacketId, _ReasonCode, Properties), Channel
     case emqx_session:puback(PacketId, Session) of
         {ok, Msg, NSession} ->
             ok = after_message_acked(ClientInfo, Msg, Properties),
-            {ok, Channel#channel{session = NSession}};
+            {ok, set_session(NSession, Channel)};
         {ok, Msg, Publishes, NSession} ->
             ok = after_message_acked(ClientInfo, Msg, Properties),
-            handle_out(publish, Publishes, Channel#channel{session = NSession});
+            handle_out(publish, Publishes, set_session(NSession, Channel));
         {error, ?RC_PACKET_IDENTIFIER_IN_USE} ->
             ?SLOG(warning, #{msg => "puback_packetId_inuse", packetId => PacketId}),
             ok = emqx_metrics:inc('packets.puback.inuse'),
@@ -388,7 +385,7 @@ handle_in(?PUBREC_PACKET(PacketId, _ReasonCode, Properties), Channel
     case emqx_session:pubrec(PacketId, Session) of
         {ok, Msg, NSession} ->
             ok = after_message_acked(ClientInfo, Msg, Properties),
-            NChannel = Channel#channel{session = NSession},
+            NChannel = set_session(NSession, Channel),
             handle_out(pubrel, {PacketId, ?RC_SUCCESS}, NChannel);
         {error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} ->
             ?SLOG(warning, #{msg => "pubrec_packetId_inuse", packetId => PacketId}),
@@ -403,7 +400,7 @@ handle_in(?PUBREC_PACKET(PacketId, _ReasonCode, Properties), Channel
 handle_in(?PUBREL_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Session}) ->
     case emqx_session:pubrel(PacketId, Session) of
         {ok, NSession} ->
-            NChannel = Channel#channel{session = NSession},
+            NChannel = set_session(NSession, Channel),
             handle_out(pubcomp, {PacketId, ?RC_SUCCESS}, NChannel);
         {error, RC = ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
             ?SLOG(warning, #{msg => "pubrec_packetId_not_found", packetId => PacketId}),
@@ -414,9 +411,9 @@ handle_in(?PUBREL_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Se
 handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Session}) ->
     case emqx_session:pubcomp(PacketId, Session) of
         {ok, NSession} ->
-            {ok, Channel#channel{session = NSession}};
+            {ok, set_session(NSession, Channel)};
         {ok, Publishes, NSession} ->
-            handle_out(publish, Publishes, Channel#channel{session = NSession});
+            handle_out(publish, Publishes, set_session(NSession, Channel));
         {error, ?RC_PACKET_IDENTIFIER_IN_USE} ->
             ok = emqx_metrics:inc('packets.pubcomp.inuse'),
             {ok, Channel};
@@ -624,7 +621,8 @@ do_publish(PacketId, Msg = #message{qos = ?QOS_2},
     case emqx_session:publish(PacketId, Msg, Session) of
         {ok, PubRes, NSession} ->
             RC = puback_reason_code(PubRes),
-            NChannel1 = ensure_timer(await_timer, Channel#channel{session = NSession}),
+            NChannel0 = set_session(NSession, Channel),
+            NChannel1 = ensure_timer(await_timer, NChannel0),
             NChannel2 = ensure_quota(PubRes, NChannel1),
             handle_out(pubrec, {PacketId, RC}, NChannel2);
         {error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} ->
@@ -698,7 +696,7 @@ do_subscribe(TopicFilter, SubOpts = #{qos := QoS}, Channel =
     NSubOpts = enrich_subopts(maps:merge(?DEFAULT_SUBOPTS, SubOpts), Channel),
     case emqx_session:subscribe(ClientInfo, NTopicFilter, NSubOpts, Session) of
         {ok, NSession} ->
-            {QoS, Channel#channel{session = NSession}};
+            {QoS, set_session(NSession, Channel)};
         {error, RC} ->
             ?SLOG(warning, #{
                 msg => "cannot_subscribe_topic_filter",
@@ -728,7 +726,7 @@ do_unsubscribe(TopicFilter, SubOpts, Channel =
     TopicFilter1 = emqx_mountpoint:mount(MountPoint, TopicFilter),
     case emqx_session:unsubscribe(ClientInfo, TopicFilter1, SubOpts, Session) of
         {ok, NSession} ->
-            {?RC_SUCCESS, Channel#channel{session = NSession}};
+            {?RC_SUCCESS, set_session(NSession, Channel)};
         {error, RC} -> {RC, Channel}
     end.
 %%--------------------------------------------------------------------
@@ -752,8 +750,23 @@ process_disconnect(ReasonCode, Properties, Channel) ->
     {ok, {close, disconnect_reason(ReasonCode)}, NChannel}.
 
 maybe_update_expiry_interval(#{'Session-Expiry-Interval' := Interval},
-                             Channel = #channel{conninfo = ConnInfo}) ->
-    Channel#channel{conninfo = ConnInfo#{expiry_interval => timer:seconds(Interval)}};
+                             Channel = #channel{conninfo = ConnInfo, clientinfo = ClientInfo}) ->
+    EI = timer:seconds(Interval),
+    OldEI = maps:get(expiry_interval, ConnInfo, 0),
+    case OldEI =:= EI of
+        true -> Channel;
+        false ->
+            NChannel = Channel#channel{conninfo = ConnInfo#{expiry_interval => EI}},
+            ClientID = maps:get(clientid, ClientInfo, undefined),
+            %% Check if the client turns off persistence (turning it on is disallowed)
+            case EI =:= 0 andalso OldEI > 0 of
+                true ->
+                    S = emqx_persistent_session:discard(ClientID, NChannel#channel.session),
+                    set_session(S, NChannel);
+                false ->
+                    NChannel
+            end
+    end;
 maybe_update_expiry_interval(_Properties, Channel) -> Channel.
 
 %%--------------------------------------------------------------------
@@ -765,39 +778,33 @@ maybe_update_expiry_interval(_Properties, Channel) -> Channel.
 handle_deliver(Delivers, Channel = #channel{conn_state = disconnected,
                                             session    = Session,
                                             clientinfo = #{clientid := ClientId}}) ->
-    NSession = emqx_session:enqueue(ignore_local(maybe_nack(Delivers), ClientId, Session), Session),
-    {ok, Channel#channel{session = NSession}};
+    Delivers1 = maybe_nack(Delivers),
+    Delivers2 = emqx_session:ignore_local(Delivers1, ClientId, Session),
+    NSession = emqx_session:enqueue(Delivers2, Session),
+    NChannel = set_session(NSession, Channel),
+    %% We consider queued/dropped messages as delivered since they are now in the session state.
+    maybe_mark_as_delivered(Session, Delivers),
+    {ok, NChannel};
 
 handle_deliver(Delivers, Channel = #channel{takeover = true,
                                             pendings = Pendings,
                                             session = Session,
                                             clientinfo = #{clientid := ClientId}}) ->
-    NPendings = lists:append(Pendings, ignore_local(maybe_nack(Delivers), ClientId, Session)),
+    NPendings = lists:append(Pendings, emqx_session:ignore_local(maybe_nack(Delivers), ClientId, Session)),
     {ok, Channel#channel{pendings = NPendings}};
 
 handle_deliver(Delivers, Channel = #channel{session = Session,
-                                            clientinfo = #{clientid := ClientId}}) ->
-    case emqx_session:deliver(ignore_local(Delivers, ClientId, Session), Session) of
+                                            clientinfo = #{clientid := ClientId}
+                                           }) ->
+    case emqx_session:deliver(emqx_session:ignore_local(Delivers, ClientId, Session), Session) of
         {ok, Publishes, NSession} ->
-            NChannel = Channel#channel{session = NSession},
+            NChannel = set_session(NSession, Channel),
+            maybe_mark_as_delivered(NSession, Delivers),
             handle_out(publish, Publishes, ensure_timer(retry_timer, NChannel));
         {ok, NSession} ->
-            {ok, Channel#channel{session = NSession}}
+            {ok, set_session(NSession, Channel)}
     end.
 
-ignore_local(Delivers, Subscriber, Session) ->
-    Subs = emqx_session:info(subscriptions, Session),
-    lists:dropwhile(fun({deliver, Topic, #message{from = Publisher}}) ->
-                        case maps:find(Topic, Subs) of
-                            {ok, #{nl := 1}} when Subscriber =:= Publisher ->
-                                ok = emqx_metrics:inc('delivery.dropped'),
-                                ok = emqx_metrics:inc('delivery.dropped.no_local'),
-                                true;
-                            _ ->
-                                false
-                        end
-                    end, Delivers).
-
 %% Nack delivers from shared subscription
 maybe_nack(Delivers) ->
     lists:filter(fun not_nacked/1, Delivers).
@@ -806,6 +813,14 @@ not_nacked({deliver, _Topic, Msg}) ->
     not (emqx_shared_sub:is_ack_required(Msg)
          andalso (ok == emqx_shared_sub:nack_no_connection(Msg))).
 
+maybe_mark_as_delivered(Session, Delivers) ->
+    case emqx_session:info(is_persistent, Session) of
+        false -> skip;
+        true ->
+            SessionID = emqx_session:info(id, Session),
+            emqx_persistent_session:mark_as_delivered(SessionID, Delivers)
+    end.
+
 %%--------------------------------------------------------------------
 %% Handle outgoing packet
 %%--------------------------------------------------------------------
@@ -898,13 +913,13 @@ return_connack(AckPacket, Channel) ->
     case maybe_resume_session(Channel) of
         ignore -> {ok, Replies, Channel};
         {ok, Publishes, NSession} ->
-            NChannel = Channel#channel{session  = NSession,
-                                       resuming = false,
+            NChannel0 = Channel#channel{resuming = false,
                                        pendings = []
                                       },
-            {Packets, NChannel1} = do_deliver(Publishes, NChannel),
+            NChannel1 = set_session(NSession, NChannel0),
+            {Packets, NChannel2} = do_deliver(Publishes, NChannel1),
             Outgoing = [{outgoing, Packets} || length(Packets) > 0],
-            {ok, Replies ++ Outgoing, NChannel1}
+            {ok, Replies ++ Outgoing, NChannel2}
     end.
 
 %%--------------------------------------------------------------------
@@ -1028,10 +1043,28 @@ handle_info(clean_authz_cache, Channel) ->
     ok = emqx_authz_cache:empty_authz_cache(),
     {ok, Channel};
 
+handle_info(die_if_test = Info, Channel) ->
+    die_if_test_compiled(),
+    ?LOG(error, "Unexpected info: ~p", [Info]),
+    {ok, Channel};
+
 handle_info(Info, Channel) ->
     ?SLOG(error, #{msg => "unexpected_info", info => Info}),
     {ok, Channel}.
 
+-ifdef(TEST).
+
+-spec die_if_test_compiled() -> no_return().
+die_if_test_compiled() ->
+    exit(normal).
+
+-else.
+
+die_if_test_compiled() ->
+    ok.
+
+-endif.
+
 %%--------------------------------------------------------------------
 %% Handle timeout
 %%--------------------------------------------------------------------
@@ -1063,9 +1096,9 @@ handle_timeout(_TRef, retry_delivery,
                Channel = #channel{session = Session}) ->
     case emqx_session:retry(Session) of
         {ok, NSession} ->
-            {ok, clean_timer(retry_timer, Channel#channel{session = NSession})};
+            {ok, clean_timer(retry_timer, set_session(NSession, Channel))};
         {ok, Publishes, Timeout, NSession} ->
-            NChannel = Channel#channel{session = NSession},
+            NChannel = set_session(NSession, Channel),
             handle_out(publish, Publishes, reset_timer(retry_timer, Timeout, NChannel))
     end;
 
@@ -1076,9 +1109,9 @@ handle_timeout(_TRef, expire_awaiting_rel,
                Channel = #channel{session = Session}) ->
     case emqx_session:expire(awaiting_rel, Session) of
         {ok, NSession} ->
-            {ok, clean_timer(await_timer, Channel#channel{session = NSession})};
+            {ok, clean_timer(await_timer, set_session(NSession, Channel))};
         {ok, Timeout, NSession} ->
-            {ok, reset_timer(await_timer, Timeout, Channel#channel{session = NSession})}
+            {ok, reset_timer(await_timer, Timeout, set_session(NSession, Channel))}
     end;
 
 handle_timeout(_TRef, expire_session, Channel) ->
@@ -1145,11 +1178,19 @@ interval(will_timer, #channel{will_msg = WillMsg}) ->
 terminate(_, #channel{conn_state = idle}) -> ok;
 terminate(normal, Channel) ->
     run_terminate_hook(normal, Channel);
-terminate({shutdown, Reason}, Channel)
-  when Reason =:= kicked; Reason =:= discarded; Reason =:= takeovered ->
+terminate({shutdown, kicked}, Channel) ->
+    _ = emqx_persistent_session:persist(Channel#channel.clientinfo,
+                                        Channel#channel.conninfo,
+                                        Channel#channel.session),
+    run_terminate_hook(kicked, Channel);
+terminate({shutdown, Reason}, Channel) when Reason =:= discarded;
+                                            Reason =:= takeovered ->
     run_terminate_hook(Reason, Channel);
 terminate(Reason, Channel = #channel{will_msg = WillMsg}) ->
     (WillMsg =/= undefined) andalso publish_will_msg(WillMsg),
+    _ = emqx_persistent_session:persist(Channel#channel.clientinfo,
+                                        Channel#channel.conninfo,
+                                        Channel#channel.session),
     run_terminate_hook(Reason, Channel).
 
 run_terminate_hook(_Reason, #channel{session = undefined}) -> ok;
@@ -1613,8 +1654,11 @@ maybe_resume_session(#channel{resuming = false}) ->
     ignore;
 maybe_resume_session(#channel{session  = Session,
                               resuming = true,
-                              pendings = Pendings}) ->
+                              pendings = Pendings,
+                              clientinfo = #{clientid := ClientId}}) ->
     {ok, Publishes, Session1} = emqx_session:replay(Session),
+    %% We consider queued/dropped messages as delivered since they are now in the session state.
+    emqx_persistent_session:mark_as_delivered(ClientId, Pendings),
     case emqx_session:deliver(Pendings, Session1) of
         {ok, Session2} ->
             {ok, Publishes, Session2};

+ 39 - 13
apps/emqx/src/emqx_cm.erl

@@ -19,7 +19,6 @@
 
 -behaviour(gen_server).
 
--include("emqx.hrl").
 -include("logger.hrl").
 -include("types.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
@@ -214,9 +213,11 @@ open_session(true, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
     Self = self(),
     CleanStart = fun(_) ->
                      ok = discard_session(ClientId),
+                     ok = emqx_persistent_session:discard_if_present(ClientId),
                      Session = create_session(ClientInfo, ConnInfo),
+                     Session1 = emqx_persistent_session:persist(ClientInfo, ConnInfo, Session),
                      register_channel(ClientId, Self, ConnInfo),
-                     {ok, #{session => Session, present => false}}
+                     {ok, #{session => Session1, present => false}}
                  end,
     emqx_cm_locker:trans(ClientId, CleanStart);
 
@@ -224,17 +225,34 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
     Self = self(),
     ResumeStart = fun(_) ->
                       case takeover_session(ClientId) of
-                          {ok, ConnMod, ChanPid, Session} ->
+                          {persistent, Session} ->
+                              %% This is a persistent session without a managing process.
+                              {Session1, Pendings} =
+                                  emqx_persistent_session:resume(ClientInfo, ConnInfo, Session),
+                              register_channel(ClientId, Self, ConnInfo),
+
+                              {ok, #{session  => Session1,
+                                     present  => true,
+                                     pendings => Pendings}};
+                          {living, ConnMod, ChanPid, Session} ->
                               ok = emqx_session:resume(ClientInfo, Session),
+                              Session1 = emqx_persistent_session:persist(ClientInfo, ConnInfo, Session),
                               Pendings = ConnMod:call(ChanPid, {takeover, 'end'}, ?T_TAKEOVER),
                               register_channel(ClientId, Self, ConnInfo),
-                              {ok, #{session  => Session,
+                              {ok, #{session  => Session1,
                                      present  => true,
                                      pendings => Pendings}};
-                          {error, not_found} ->
+                          {expired, OldSession} ->
+                              _ = emqx_persistent_session:discard(ClientId, OldSession),
                               Session = create_session(ClientInfo, ConnInfo),
+                              Session1 = emqx_persistent_session:persist(ClientInfo, ConnInfo, Session),
                               register_channel(ClientId, Self, ConnInfo),
-                              {ok, #{session => Session, present => false}}
+                              {ok, #{session => Session1, present => false}};
+                          none ->
+                              Session = create_session(ClientInfo, ConnInfo),
+                              Session1 = emqx_persistent_session:persist(ClientInfo, ConnInfo, Session),
+                              register_channel(ClientId, Self, ConnInfo),
+                              {ok, #{session => Session1, present => false}}
                       end
                   end,
     emqx_cm_locker:trans(ClientId, ResumeStart).
@@ -246,13 +264,17 @@ create_session(ClientInfo, ConnInfo) ->
     ok = emqx_hooks:run('session.created', [ClientInfo, emqx_session:info(Session)]),
     Session.
 
-get_session_confs(#{zone := Zone}, #{receive_maximum := MaxInflight}) ->
+get_session_confs(#{zone := Zone}, #{receive_maximum := MaxInflight, expiry_interval := EI}) ->
     #{max_subscriptions => get_mqtt_conf(Zone, max_subscriptions),
       upgrade_qos => get_mqtt_conf(Zone, upgrade_qos),
       max_inflight => MaxInflight,
       retry_interval => get_mqtt_conf(Zone, retry_interval),
       await_rel_timeout => get_mqtt_conf(Zone, await_rel_timeout),
-      mqueue => mqueue_confs(Zone)
+      mqueue => mqueue_confs(Zone),
+      %% TODO: Add conf for allowing/disallowing persistent sessions.
+      %% Note that the connection info is already enriched to have
+      %% default config values for session expiry.
+      is_persistent => EI > 0
      }.
 
 mqueue_confs(Zone) ->
@@ -266,11 +288,15 @@ get_mqtt_conf(Zone, Key) ->
     emqx_config:get_zone_conf(Zone, [mqtt, Key]).
 
 %% @doc Try to takeover a session.
--spec(takeover_session(emqx_types:clientid()) ->
-        {error, term()} | {ok, atom(), pid(), emqx_session:session()}).
+-spec takeover_session(emqx_types:clientid()) ->
+          none
+        | {living, atom(), pid(), emqx_session:session()}
+        | {persistent, emqx_session:session()}
+        | {expired, emqx_session:session()}.
 takeover_session(ClientId) ->
     case lookup_channels(ClientId) of
-        [] -> {error, not_found};
+        [] ->
+            emqx_persistent_session:lookup(ClientId);
         [ChanPid] ->
             takeover_session(ClientId, ChanPid);
         ChanPids ->
@@ -285,10 +311,10 @@ takeover_session(ClientId) ->
 takeover_session(ClientId, ChanPid) when node(ChanPid) == node() ->
     case get_chann_conn_mod(ClientId, ChanPid) of
         undefined ->
-            {error, not_found};
+            emqx_persistent_session:lookup(ClientId);
         ConnMod when is_atom(ConnMod) ->
             Session = ConnMod:call(ChanPid, {takeover, 'begin'}, ?T_TAKEOVER),
-            {ok, ConnMod, ChanPid, Session}
+            {living, ConnMod, ChanPid, Session}
     end;
 
 takeover_session(ClientId, ChanPid) ->

+ 3 - 0
apps/emqx/src/emqx_guid.erl

@@ -39,6 +39,9 @@
         , from_base62/1
         ]).
 
+-export_type([ guid/0
+             ]).
+
 -define(TAG_VERSION, 131).
 -define(PID_EXT, 103).
 -define(NEW_PID_EXT, 88).

+ 503 - 0
apps/emqx/src/emqx_persistent_session.erl

@@ -0,0 +1,503 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_persistent_session).
+
+-export([ is_store_enabled/0
+        , init_db_backend/0
+        ]).
+
+-export([ discard/2
+        , discard_if_present/1
+        , lookup/1
+        , persist/3
+        , persist_message/1
+        , pending/1
+        , pending/2
+        , resume/3
+        ]).
+
+-export([ add_subscription/3
+        , remove_subscription/3
+        ]).
+
+-export([ mark_as_delivered/2
+        , mark_resume_begin/1
+        ]).
+
+-export([ pending_messages_in_db/2
+        , delete_session_message/1
+        , gc_session_messages/1
+        , session_message_info/2
+        ]).
+
+-export([ delete_message/1
+        , first_message_id/0
+        , next_message_id/1
+        ]).
+
+-export_type([ sess_msg_key/0
+             ]).
+
+-include("emqx.hrl").
+-include("emqx_persistent_session.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+-compile({inline, [is_store_enabled/0]}).
+
+-define(MAX_EXPIRY_INTERVAL, 4294967295000). %% 16#FFFFFFFF * 1000
+
+%% NOTE: Order is significant because of traversal order of the table.
+-define(MARKER, 3).
+-define(DELIVERED, 2).
+-define(UNDELIVERED, 1).
+-define(ABANDONED, 0).
+
+
+-type bin_timestamp() :: <<_:64>>.
+-opaque sess_msg_key() ::
+          {emqx_guid:guid(), emqx_guid:guid(), emqx_types:topic(), ?UNDELIVERED | ?DELIVERED}
+        | {emqx_guid:guid(), emqx_guid:guid(), <<>>              , ?MARKER}
+        | {emqx_guid:guid(), <<>>            , bin_timestamp()   , ?ABANDONED}.
+
+-type gc_traverse_fun() :: fun(('delete' | 'marker' | 'abandoned', sess_msg_key()) -> 'ok').
+
+%%--------------------------------------------------------------------
+%% Init
+%%--------------------------------------------------------------------
+
+init_db_backend() ->
+    case is_store_enabled() of
+        true  ->
+            ok = emqx_trie:create_session_trie(),
+            emqx_persistent_session_mnesia_backend:create_tables(),
+            persistent_term:put(?db_backend_key, emqx_persistent_session_mnesia_backend),
+            ok;
+        false ->
+            persistent_term:put(?db_backend_key, emqx_persistent_session_dummy_backend),
+            ok
+    end.
+
+is_store_enabled() ->
+    emqx_config:get(?is_enabled_key).
+
+%%--------------------------------------------------------------------
+%% Session message ADT API
+%%--------------------------------------------------------------------
+
+-spec session_message_info('timestamp' | 'sessionID', sess_msg_key()) -> term().
+session_message_info(timestamp, {_, <<>>, <<TS:64>>, ?ABANDONED}) -> TS;
+session_message_info(timestamp, {_, GUID, _        , _         }) -> emqx_guid:timestamp(GUID);
+session_message_info(sessionID, {SessionID, _, _, _}) -> SessionID.
+
+%%--------------------------------------------------------------------
+%% DB API
+%%--------------------------------------------------------------------
+
+first_message_id() ->
+    ?db_backend:first_message_id().
+
+next_message_id(Key) ->
+    ?db_backend:next_message_id(Key).
+
+delete_message(Key) ->
+    ?db_backend:delete_message(Key).
+
+first_session_message() ->
+    ?db_backend:first_session_message().
+
+next_session_message(Key) ->
+    ?db_backend:next_session_message(Key).
+
+delete_session_message(Key) ->
+    ?db_backend:delete_session_message(Key).
+
+put_session_store(#session_store{} = SS) ->
+    ?db_backend:put_session_store(SS).
+
+delete_session_store(ClientID) ->
+    ?db_backend:delete_session_store(ClientID).
+
+lookup_session_store(ClientID) ->
+    ?db_backend:lookup_session_store(ClientID).
+
+put_session_message({_, _, _, _} = Key) ->
+    ?db_backend:put_session_message(#session_msg{ key = Key }).
+
+put_message(Msg) ->
+    ?db_backend:put_message(Msg).
+
+get_message(MsgId) ->
+    ?db_backend:get_message(MsgId).
+
+pending_messages_in_db(SessionID, MarkerIds) ->
+    ?db_backend:ro_transaction(pending_messages_fun(SessionID, MarkerIds)).
+
+%%--------------------------------------------------------------------
+%% Session API
+%%--------------------------------------------------------------------
+
+%% The timestamp (TS) is the last time a client interacted with the session,
+%% or when the client disconnected.
+-spec persist(emqx_types:clientinfo(),
+              emqx_types:conninfo(),
+              emqx_session:session()) -> emqx_session:session().
+
+persist(#{ clientid := ClientID }, ConnInfo, Session) ->
+    case ClientID == undefined orelse not emqx_session:info(is_persistent, Session) of
+        true -> Session;
+        false ->
+            SS = #session_store{ client_id       = ClientID
+                               , expiry_interval = maps:get(expiry_interval, ConnInfo)
+                               , ts              = timestamp_from_conninfo(ConnInfo)
+                               , session         = Session},
+            case persistent_session_status(SS) of
+                not_persistent -> Session;
+                expired        -> discard(ClientID, Session);
+                persistent     -> put_session_store(SS),
+                                  Session
+            end
+    end.
+
+timestamp_from_conninfo(ConnInfo) ->
+    case maps:get(disconnected_at, ConnInfo, undefined) of
+        undefined  -> erlang:system_time(millisecond);
+        Disconnect -> Disconnect
+    end.
+
+lookup(ClientID) when is_binary(ClientID) ->
+    case lookup_session_store(ClientID) of
+        none -> none;
+        {value, #session_store{session = S} = SS} ->
+            case persistent_session_status(SS) of
+                expired        -> {expired, S};
+                persistent     -> {persistent, S}
+            end
+    end.
+
+-spec discard_if_present(binary()) -> 'ok'.
+discard_if_present(ClientID) ->
+    case lookup(ClientID) of
+        none -> ok;
+        {Tag, Session} when Tag =:= persistent; Tag =:= expired ->
+            _ = discard(ClientID, Session),
+            ok
+    end.
+
+-spec discard(binary(), emgx_session:session()) -> emgx_session:session().
+discard(ClientID, Session) ->
+    discard_opt(is_store_enabled(), ClientID, Session).
+
+discard_opt(false,_ClientID, Session) ->
+    emqx_session:set_field(is_persistent, false, Session);
+discard_opt(true, ClientID, Session) ->
+    delete_session_store(ClientID),
+    SessionID = emqx_session:info(id, Session),
+    put_session_message({SessionID, <<>>, << (erlang:system_time(microsecond)) : 64>>, ?ABANDONED}),
+    Subscriptions = emqx_session:info(subscriptions, Session),
+    emqx_session_router:delete_routes(SessionID, Subscriptions),
+    emqx_session:set_field(is_persistent, false, Session).
+
+-spec mark_resume_begin(emqx_session:sessionID()) -> emqx_guid:guid().
+mark_resume_begin(SessionID) ->
+    MarkerID = emqx_guid:gen(),
+    put_session_message({SessionID, MarkerID, <<>>, ?MARKER}),
+    MarkerID.
+
+add_subscription(TopicFilter, SessionID, true = _IsPersistent) ->
+    case is_store_enabled() of
+        true  -> emqx_session_router:do_add_route(TopicFilter, SessionID);
+        false -> ok
+    end;
+add_subscription(_TopicFilter, _SessionID, false = _IsPersistent) ->
+    ok.
+
+remove_subscription(TopicFilter, SessionID, true = _IsPersistent) ->
+    case is_store_enabled() of
+        true  -> emqx_session_router:do_delete_route(TopicFilter, SessionID);
+        false -> ok
+    end;
+remove_subscription(_TopicFilter, _SessionID, false = _IsPersistent) ->
+    ok.
+
+%%--------------------------------------------------------------------
+%% Resuming from DB state
+%%--------------------------------------------------------------------
+
+%% Must be called inside a emqx_cm_locker transaction.
+-spec resume(emqx_types:clientinfo(), emqx_types:conninfo(), emqx_session:session()
+            ) -> {emqx_session:session(), [emqx_types:deliver()]}.
+resume(ClientInfo = #{clientid := ClientID}, ConnInfo, Session) ->
+    SessionID = emqx_session:info(id, Session),
+    ?tp(ps_resuming, #{from => db, sid => SessionID}),
+
+    %% NOTE: Order is important!
+
+    %% 1. Get pending messages from DB.
+    ?tp(ps_initial_pendings, #{sid => SessionID}),
+    Pendings1 = pending(SessionID),
+    Pendings2 = emqx_session:ignore_local(Pendings1, ClientID, Session),
+    ?tp(ps_got_initial_pendings, #{ sid => SessionID
+                                  , msgs => Pendings1}),
+
+    %% 2. Enqueue messages to mimic that the process was alive
+    %%    when the messages were delivered.
+    ?tp(ps_persist_pendings, #{sid => SessionID}),
+    Session1 = emqx_session:enqueue(Pendings2, Session),
+    Session2 = persist(ClientInfo, ConnInfo, Session1),
+    mark_as_delivered(SessionID, Pendings2),
+    ?tp(ps_persist_pendings_msgs, #{ msgs => Pendings2
+                                   , sid => SessionID}),
+
+    %% 3. Notify writers that we are resuming.
+    %%    They will buffer new messages.
+    ?tp(ps_notify_writers, #{sid => SessionID}),
+    Nodes = mria_mnesia:running_nodes(),
+    NodeMarkers = resume_begin(Nodes, SessionID),
+    ?tp(ps_node_markers, #{sid => SessionID, markers => NodeMarkers}),
+
+    %% 4. Subscribe to topics.
+    ?tp(ps_resume_session, #{sid => SessionID}),
+    ok = emqx_session:resume(ClientInfo, Session2),
+
+    %% 5. Get pending messages from DB until we find all markers.
+    ?tp(ps_marker_pendings, #{sid => SessionID}),
+    MarkerIDs = [Marker || {_, Marker} <- NodeMarkers],
+    Pendings3 = pending(SessionID, MarkerIDs),
+    Pendings4 = emqx_session:ignore_local(Pendings3, ClientID, Session),
+    ?tp(ps_marker_pendings_msgs, #{ sid => SessionID
+                                  , msgs => Pendings4}),
+
+    %% 6. Get pending messages from writers.
+    ?tp(ps_resume_end, #{sid => SessionID}),
+    WriterPendings = resume_end(Nodes, SessionID),
+    ?tp(ps_writer_pendings, #{ msgs => WriterPendings
+                             , sid => SessionID}),
+
+    %% 7. Drain the inbox and usort the messages
+    %%    with the pending messages. (Should be done by caller.)
+    {Session2, Pendings4 ++ WriterPendings}.
+
+resume_begin(Nodes, SessionID) ->
+    Res = erpc:multicall(Nodes, emqx_session_router, resume_begin, [self(), SessionID]),
+    [{Node, Marker} || {{ok, {ok, Marker}}, Node} <- lists:zip(Res, Nodes)].
+
+resume_end(Nodes, SessionID) ->
+    Res = erpc:multicall(Nodes, emqx_session_router, resume_end, [self(), SessionID]),
+    ?tp(ps_erpc_multical_result, #{ res => Res, sid => SessionID }),
+    %% TODO: Should handle the errors
+    [ {deliver, STopic, M}
+      || {ok, {ok, Messages}} <- Res,
+         {{M, STopic}} <- Messages
+    ].
+
+
+%%--------------------------------------------------------------------
+%% Messages API
+%%--------------------------------------------------------------------
+
+persist_message(Msg) ->
+    case is_store_enabled() of
+        true  -> do_persist_message(Msg);
+        false -> ok
+    end.
+
+do_persist_message(Msg) ->
+    case emqx_message:get_flag(dup, Msg) orelse emqx_message:is_sys(Msg) of
+        true  -> ok;
+        false ->
+            case emqx_session_router:match_routes(emqx_message:topic(Msg)) of
+                [] -> ok;
+                Routes ->
+                    put_message(Msg),
+                    MsgId = emqx_message:id(Msg),
+                    persist_message_routes(Routes, MsgId, Msg)
+            end
+    end.
+
+persist_message_routes([#route{dest = SessionID, topic = STopic}|Left], MsgId, Msg) ->
+    ?tp(ps_persist_msg, #{sid => SessionID, payload => emqx_message:payload(Msg)}),
+    put_session_message({SessionID, MsgId, STopic, ?UNDELIVERED}),
+    emqx_session_router:buffer(SessionID, STopic, Msg),
+    persist_message_routes(Left, MsgId, Msg);
+persist_message_routes([], _MsgId, _Msg) ->
+    ok.
+
+mark_as_delivered(SessionID, List) ->
+    case is_store_enabled() of
+        true  -> do_mark_as_delivered(SessionID, List);
+        false -> ok
+    end.
+
+do_mark_as_delivered(SessionID, [{deliver, STopic, Msg}|Left]) ->
+    MsgID = emqx_message:id(Msg),
+    case next_session_message({SessionID, MsgID, STopic, ?ABANDONED}) of
+        {SessionID, MsgID, STopic, ?UNDELIVERED} = Key ->
+            %% We can safely delete this entry
+            %% instead of marking it as delivered.
+            delete_session_message(Key);
+        _ ->
+            put_session_message({SessionID, MsgID, STopic, ?DELIVERED})
+    end,
+    do_mark_as_delivered(SessionID, Left);
+do_mark_as_delivered(_SessionID, []) ->
+    ok.
+
+-spec pending(emqx_session:sessionID()) ->
+          [{emqx_types:message(), STopic :: binary()}].
+pending(SessionID) ->
+    pending_messages_in_db(SessionID, []).
+
+-spec pending(emqx_session:sessionID(), MarkerIDs :: [emqx_guid:guid()]) ->
+          [{emqx_types:message(), STopic :: binary()}].
+pending(SessionID, MarkerIds) ->
+    %% TODO: Handle lost MarkerIDs
+    case emqx_session_router:pending(SessionID, MarkerIds) of
+        incomplete ->
+            timer:sleep(10),
+            pending(SessionID, MarkerIds);
+        Delivers ->
+            Delivers
+    end.
+
+%%--------------------------------------------------------------------
+%% Session internal functions
+%%--------------------------------------------------------------------
+
+%% @private [MQTT-3.1.2-23]
+persistent_session_status(#session_store{expiry_interval = 0}) ->
+    not_persistent;
+persistent_session_status(#session_store{expiry_interval = ?MAX_EXPIRY_INTERVAL}) ->
+    persistent;
+persistent_session_status(#session_store{expiry_interval = E, ts = TS}) ->
+    case E + TS > erlang:system_time(millisecond) of
+        true  -> persistent;
+        false -> expired
+    end.
+
+%%--------------------------------------------------------------------
+%% Pending messages internal functions
+%%--------------------------------------------------------------------
+
+pending_messages_fun(SessionID, MarkerIds) ->
+    fun() ->
+        case pending_messages({SessionID, <<>>, <<>>, ?DELIVERED}, [], MarkerIds) of
+            {Pending, []} -> read_pending_msgs(Pending, []);
+            {_Pending, [_|_]} -> incomplete
+        end
+    end.
+
+read_pending_msgs([{MsgId, STopic}|Left], Acc) ->
+    Acc1 = try [{deliver, STopic, get_message(MsgId)}|Acc]
+           catch error:{msg_not_found, _} ->
+                   HighwaterMark = erlang:system_time(microsecond)
+                       - emqx_config:get(?msg_retain) * 1000,
+                   case emqx_guid:timestamp(MsgId) < HighwaterMark of
+                       true  -> Acc;  %% Probably cleaned by GC
+                       false -> error({msg_not_found, MsgId})
+                   end
+           end,
+    read_pending_msgs(Left, Acc1);
+read_pending_msgs([], Acc) ->
+    lists:reverse(Acc).
+
+%% The keys are ordered by
+%%     {sessionID(), <<>>, bin_timestamp(), ?ABANDONED} For abandoned sessions (clean started or expired).
+%%     {sessionID(), emqx_guid:guid(), STopic :: binary(), ?DELIVERED | ?UNDELIVERED | ?MARKER}
+%%  where
+%%     <<>> < emqx_guid:guid()
+%%     <<>> < bin_timestamp()
+%%     emqx_guid:guid() is ordered in ts() and by node()
+%%     ?ABANDONED < ?UNDELIVERED < ?DELIVERED < ?MARKER
+%%
+%% We traverse the table until we reach another session.
+%% TODO: Garbage collect the delivered messages.
+pending_messages({SessionID, PrevMsgId, PrevSTopic, PrevTag} = PrevKey, Acc, MarkerIds) ->
+    case next_session_message(PrevKey) of
+        {S, <<>>, _TS, ?ABANDONED} when S =:= SessionID ->
+            {[], []};
+        {S, MsgId, <<>>, ?MARKER} = Key when S =:= SessionID ->
+            MarkerIds1 = MarkerIds -- [MsgId],
+            case PrevTag =:= ?UNDELIVERED of
+                false -> pending_messages(Key, Acc, MarkerIds1);
+                true  -> pending_messages(Key, [{PrevMsgId, PrevSTopic}|Acc], MarkerIds1)
+            end;
+        {S, MsgId, STopic, ?DELIVERED} = Key when S =:= SessionID,
+                                                  MsgId =:= PrevMsgId,
+                                                  STopic =:= PrevSTopic ->
+            pending_messages(Key, Acc, MarkerIds);
+        {S, _MsgId, _STopic, _Tag} = Key when S =:= SessionID ->
+            case PrevTag =:= ?UNDELIVERED of
+                false -> pending_messages(Key, Acc, MarkerIds);
+                true  -> pending_messages(Key, [{PrevMsgId, PrevSTopic}|Acc], MarkerIds)
+            end;
+        _What -> %% Next sessionID or '$end_of_table'
+            case PrevTag =:= ?UNDELIVERED of
+                false -> {lists:reverse(Acc), MarkerIds};
+                true  -> {lists:reverse([{PrevMsgId, PrevSTopic}|Acc]), MarkerIds}
+            end
+    end.
+
+%%--------------------------------------------------------------------
+%% Garbage collection
+%%--------------------------------------------------------------------
+
+-spec gc_session_messages(gc_traverse_fun()) -> 'ok'.
+gc_session_messages(Fun) ->
+    gc_traverse(first_session_message(), <<>>, false, Fun).
+
+gc_traverse('$end_of_table', _SessionID, _Abandoned, _Fun) ->
+    ok;
+gc_traverse({S, <<>>, _TS, ?ABANDONED} = Key, _SessionID, _Abandoned, Fun) ->
+    %% Only report the abandoned session if it has no messages.
+    %% We want to keep the abandoned marker to last to make the GC reentrant.
+    case next_session_message(Key) of
+        '$end_of_table' = NextKey ->
+            ok = Fun(abandoned, Key),
+            gc_traverse(NextKey, S, true, Fun);
+        {S2, _, _, _} = NextKey when S =:= S2 ->
+            gc_traverse(NextKey, S, true, Fun);
+        {_, _, _, _} = NextKey ->
+            ok = Fun(abandoned, Key),
+            gc_traverse(NextKey, S, true, Fun)
+    end;
+gc_traverse({S, _MsgID, <<>>, ?MARKER} = Key, SessionID, Abandoned, Fun) ->
+    ok = Fun(marker, Key),
+    NewAbandoned = S =:= SessionID andalso Abandoned,
+    gc_traverse(next_session_message(Key), S, NewAbandoned, Fun);
+gc_traverse({S, _MsgID, _STopic, _Tag} = Key, SessionID, Abandoned, Fun) when Abandoned andalso
+                                                                              S =:= SessionID ->
+    %% Delete all messages from an abandoned session.
+    ok = Fun(delete, Key),
+    gc_traverse(next_session_message(Key), S, Abandoned, Fun);
+gc_traverse({S, MsgID, STopic, ?UNDELIVERED} = Key, SessionID, Abandoned, Fun) ->
+    case next_session_message(Key) of
+        {S1, M, ST, ?DELIVERED} = NextKey when S1     =:= S andalso
+                                               MsgID  =:= M andalso
+                                               STopic =:= ST ->
+            %% We have both markers for the same message/topic so it is safe to delete both.
+            ok = Fun(delete, Key),
+            ok = Fun(delete, NextKey),
+            gc_traverse(next_session_message(NextKey), S, Abandoned, Fun);
+        NextKey ->
+            %% Something else is here, so let's just loop.
+            NewAbandoned = S =:= SessionID andalso Abandoned,
+            gc_traverse(NextKey, SessionID, NewAbandoned, Fun)
+    end;
+gc_traverse({S, _MsgID, _STopic, ?DELIVERED} = Key, SessionID, Abandoned, Fun) ->
+    %% We have a message that is marked as ?DELIVERED, but the ?UNDELIVERED is missing.
+    NewAbandoned = S =:= SessionID andalso Abandoned,
+    gc_traverse(next_session_message(Key), S, NewAbandoned, Fun).

+ 33 - 0
apps/emqx/src/emqx_persistent_session.hrl

@@ -0,0 +1,33 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-define(SESSION_STORE, emqx_session_store).
+-define(SESS_MSG_TAB, emqx_session_msg).
+-define(MSG_TAB, emqx_persistent_msg).
+
+-record(session_store, { client_id        :: binary()
+                       , expiry_interval  :: non_neg_integer()
+                       , ts               :: non_neg_integer()
+                       , session          :: emqx_session:session()}).
+
+-record(session_msg, {key      :: emqx_persistent_session:sess_msg_key(),
+                      val = [] :: []}).
+
+-define(db_backend_key, [persistent_session_store, db_backend]).
+-define(is_enabled_key, [persistent_session_store, enabled]).
+-define(msg_retain, [persistent_session_store, max_retain_undelivered]).
+
+-define(db_backend, (persistent_term:get(?db_backend_key))).

+ 76 - 0
apps/emqx/src/emqx_persistent_session_dummy_backend.erl

@@ -0,0 +1,76 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_persistent_session_dummy_backend).
+
+-include("emqx_persistent_session.hrl").
+
+-export([ first_message_id/0
+        , next_message_id/1
+        , delete_message/1
+        , first_session_message/0
+        , next_session_message/1
+        , delete_session_message/1
+        , put_session_store/1
+        , delete_session_store/1
+        , lookup_session_store/1
+        , put_session_message/1
+        , put_message/1
+        , get_message/1
+        , ro_transaction/1
+        ]).
+
+first_message_id() ->
+    '$end_of_table'.
+
+next_message_id(_) ->
+    '$end_of_table'.
+
+-spec delete_message(binary()) -> no_return().
+delete_message(_Key) ->
+    error(should_not_be_called).
+
+first_session_message() ->
+    '$end_of_table'.
+
+next_session_message(_Key) ->
+    '$end_of_table'.
+
+delete_session_message(_Key) ->
+    ok.
+
+put_session_store(#session_store{}) ->
+    ok.
+
+delete_session_store(_ClientID) ->
+    ok.
+
+lookup_session_store(_ClientID) ->
+    none.
+
+put_session_message({_, _, _, _}) ->
+    ok.
+
+put_message(_Msg) ->
+    ok.
+
+-spec get_message(binary()) -> no_return().
+get_message(_MsgId) ->
+    error(should_not_be_called).
+
+ro_transaction(Fun) ->
+    Fun().
+

+ 152 - 0
apps/emqx/src/emqx_persistent_session_gc.erl

@@ -0,0 +1,152 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_persistent_session_gc).
+
+-behaviour(gen_server).
+
+-include("emqx_persistent_session.hrl").
+
+%% API
+-export([start_link/0]).
+
+%% gen_server callbacks
+-export([ init/1
+        , handle_call/3
+        , handle_cast/2
+        , handle_info/2
+        , terminate/2
+        ]).
+
+-ifdef(TEST).
+-export([ session_gc_worker/2
+        , message_gc_worker/0
+        ]).
+-endif.
+
+-define(SERVER, ?MODULE).
+%% TODO: Maybe these should be configurable?
+-define(MARKER_GRACE_PERIOD, 60000000).
+-define(ABANDONED_GRACE_PERIOD, 300000000).
+
+%%--------------------------------------------------------------------
+%% API
+%%--------------------------------------------------------------------
+
+start_link() ->
+    gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+
+%%--------------------------------------------------------------------
+%% gen_server callbacks
+%%--------------------------------------------------------------------
+
+init([]) ->
+    process_flag(trap_exit, true),
+    {ok, start_message_gc_timer(start_session_gc_timer(#{}))}.
+
+handle_call(_Request, _From, State) ->
+    Reply = ok,
+    {reply, Reply, State}.
+
+handle_cast(_Request, State) ->
+    {noreply, State}.
+
+handle_info({timeout, Ref, session_gc_timeout}, State) ->
+    State1 = session_gc_timeout(Ref, State),
+    {noreply, State1};
+handle_info({timeout, Ref, message_gc_timeout}, State) ->
+    State1 = message_gc_timeout(Ref, State),
+    {noreply, State1};
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+terminate(_Reason, _State) ->
+    ok.
+
+%%--------------------------------------------------------------------
+%% Internal functions
+%%--------------------------------------------------------------------
+
+%%--------------------------------------------------------------------
+%% Session messages GC
+%%--------------------------------------------------------------------
+
+start_session_gc_timer(State) ->
+    Interval = emqx_config:get([persistent_session_store, session_message_gc_interval]),
+    State#{ session_gc_timer => erlang:start_timer(Interval, self(), session_gc_timeout)}.
+
+session_gc_timeout(Ref, #{ session_gc_timer := R } = State) when R =:= Ref ->
+    %% Prevent overlapping processes.
+    GCPid = maps:get(session_gc_pid, State, undefined),
+    case GCPid =/= undefined andalso erlang:is_process_alive(GCPid) of
+        true  -> start_session_gc_timer(State);
+        false -> start_session_gc_timer(State#{ session_gc_pid => proc_lib:spawn_link(fun session_gc_worker/0)})
+    end;
+session_gc_timeout(_Ref, State) ->
+    State.
+
+session_gc_worker() ->
+    ok = emqx_persistent_session:gc_session_messages(fun session_gc_worker/2).
+
+session_gc_worker(delete, Key) ->
+    emqx_persistent_session:delete_session_message(Key);
+session_gc_worker(marker, Key) ->
+    TS = emqx_persistent_session:session_message_info(timestamp, Key),
+    case TS + ?MARKER_GRACE_PERIOD < erlang:system_time(microsecond) of
+        true  -> emqx_persistent_session:delete_session_message(Key);
+        false -> ok
+    end;
+session_gc_worker(abandoned, Key) ->
+    TS = emqx_persistent_session:session_message_info(timestamp, Key),
+    case TS + ?ABANDONED_GRACE_PERIOD < erlang:system_time(microsecond) of
+        true  -> emqx_persistent_session:delete_session_message(Key);
+        false -> ok
+    end.
+
+%%--------------------------------------------------------------------
+%% Message GC
+%% --------------------------------------------------------------------
+%% The message GC simply removes all messages older than the retain
+%% period. A more exact GC would either involve treating the session
+%% message table as root set, or some kind of reference counting.
+%% We sacrifice space for simplicity at this point.
+start_message_gc_timer(State) ->
+    Interval = emqx_config:get([persistent_session_store, session_message_gc_interval]),
+    State#{ message_gc_timer => erlang:start_timer(Interval, self(), message_gc_timeout)}.
+
+message_gc_timeout(Ref, #{ message_gc_timer := R } = State) when R =:= Ref ->
+    %% Prevent overlapping processes.
+    GCPid = maps:get(message_gc_pid, State, undefined),
+    case GCPid =/= undefined andalso erlang:is_process_alive(GCPid) of
+        true  -> start_message_gc_timer(State);
+        false -> start_message_gc_timer(State#{ message_gc_pid => proc_lib:spawn_link(fun message_gc_worker/0)})
+    end;
+message_gc_timeout(_Ref, State) ->
+    State.
+
+message_gc_worker() ->
+    HighWaterMark = erlang:system_time(microsecond) - emqx_config:get(?msg_retain) * 1000,
+    message_gc_worker(emqx_persistent_session:first_message_id(), HighWaterMark).
+
+message_gc_worker('$end_of_table', _HighWaterMark) ->
+    ok;
+message_gc_worker(MsgId, HighWaterMark) ->
+    case emqx_guid:timestamp(MsgId) < HighWaterMark of
+        true ->
+            emqx_persistent_session:delete_message(MsgId),
+            message_gc_worker(emqx_persistent_session:next_message_id(MsgId), HighWaterMark);
+        false ->
+            ok
+    end.

+ 110 - 0
apps/emqx/src/emqx_persistent_session_mnesia_backend.erl

@@ -0,0 +1,110 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_persistent_session_mnesia_backend).
+
+-include("emqx.hrl").
+-include("emqx_persistent_session.hrl").
+
+-export([ create_tables/0
+        , first_message_id/0
+        , next_message_id/1
+        , delete_message/1
+        , first_session_message/0
+        , next_session_message/1
+        , delete_session_message/1
+        , put_session_store/1
+        , delete_session_store/1
+        , lookup_session_store/1
+        , put_session_message/1
+        , put_message/1
+        , get_message/1
+        , ro_transaction/1
+        ]).
+
+create_tables() ->
+    ok = mria:create_table(?SESSION_STORE, [
+                {type, set},
+                {rlog_shard, ?PERSISTENT_SESSION_SHARD},
+                {storage, disc_copies},
+                {record_name, session_store},
+                {attributes, record_info(fields, session_store)},
+                {storage_properties, [{ets, [{read_concurrency, true}]}]}]),
+
+    ok = mria:create_table(?SESS_MSG_TAB, [
+                {type, ordered_set},
+                {rlog_shard, ?PERSISTENT_SESSION_SHARD},
+                {storage, disc_copies},
+                {record_name, session_msg},
+                {attributes, record_info(fields, session_msg)},
+                {storage_properties, [{ets, [{read_concurrency, true},
+                                             {write_concurrency, true}]}]}]),
+
+    ok = mria:create_table(?MSG_TAB, [
+                {type, ordered_set},
+                {rlog_shard, ?PERSISTENT_SESSION_SHARD},
+                {storage, disc_copies},
+                {record_name, message},
+                {attributes, record_info(fields, message)},
+                {storage_properties, [{ets, [{read_concurrency, true},
+                                             {write_concurrency, true}]}]}]).
+
+first_session_message() ->
+    mnesia:dirty_first(?SESS_MSG_TAB).
+
+next_session_message(Key) ->
+    mnesia:dirty_next(?SESS_MSG_TAB, Key).
+
+first_message_id() ->
+    mnesia:dirty_first(?MSG_TAB).
+
+next_message_id(Key) ->
+    mnesia:dirty_next(?MSG_TAB, Key).
+
+delete_message(Key) ->
+    mria:dirty_delete(?MSG_TAB, Key).
+
+delete_session_message(Key) ->
+    mria:dirty_delete(?SESS_MSG_TAB, Key).
+
+put_session_store(SS) ->
+    mria:dirty_write(?SESSION_STORE, SS).
+
+delete_session_store(ClientID) ->
+    mria:dirty_delete(?SESSION_STORE, ClientID).
+
+lookup_session_store(ClientID) ->
+    case mnesia:dirty_read(?SESSION_STORE, ClientID) of
+        [] -> none;
+        [SS] -> {value, SS}
+    end.
+
+put_session_message(SessMsg) ->
+    mria:dirty_write(?SESS_MSG_TAB, SessMsg).
+
+put_message(Msg) ->
+    mria:dirty_write(?MSG_TAB, Msg).
+
+get_message(MsgId) ->
+    case mnesia:read(?MSG_TAB, MsgId) of
+        [] -> error({msg_not_found, MsgId});
+        [Msg] -> Msg
+    end.
+
+ro_transaction(Fun) ->
+    {atomic, Res} = mria:ro_transaction(?PERSISTENT_SESSION_SHARD, Fun),
+    Res.
+

+ 60 - 0
apps/emqx/src/emqx_persistent_session_sup.erl

@@ -0,0 +1,60 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_persistent_session_sup).
+
+-behaviour(supervisor).
+
+-export([start_link/0]).
+
+-export([init/1]).
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+init([]) ->
+    %% We want this supervisor to own the table for restarts
+    SessionTab = emqx_session_router:create_init_tab(),
+
+    %% Resume worker sup
+    ResumeSup = #{id => router_worker_sup,
+                  start => {emqx_session_router_worker_sup, start_link, [SessionTab]},
+                  restart => permanent,
+                  shutdown => 2000,
+                  type => supervisor,
+                  modules => [emqx_session_router_worker_sup]},
+
+    SessionRouterPool = emqx_pool_sup:spec(session_router_pool,
+                                           [session_router_pool, hash,
+                                            {emqx_session_router, start_link, []}]),
+
+    GCWorker = child_spec(emqx_persistent_session_gc, worker),
+
+    Spec = #{ strategy  => one_for_all
+            , intensity => 0
+            , period    => 1
+            },
+
+    {ok, {Spec, [ResumeSup, SessionRouterPool, GCWorker]}}.
+
+child_spec(Mod, worker) ->
+    #{id => Mod,
+      start => {Mod, start_link, []},
+      restart => permanent,
+      shutdown => 15000,
+      type => worker,
+      modules => [Mod]
+     }.

+ 8 - 101
apps/emqx/src/emqx_router.erl

@@ -116,8 +116,10 @@ do_add_route(Topic, Dest) when is_binary(Topic) ->
             ok = emqx_router_helper:monitor(Dest),
             case emqx_topic:wildcard(Topic) of
                 true  ->
-                    maybe_trans(fun insert_trie_route/1, [Route]);
-                false -> insert_direct_route(Route)
+                    Fun = fun emqx_router_utils:insert_trie_route/2,
+                    emqx_router_utils:maybe_trans(Fun, [?ROUTE_TAB, Route], ?ROUTE_SHARD);
+                false ->
+                    emqx_router_utils:insert_direct_route(?ROUTE_TAB, Route)
             end
     end.
 
@@ -162,8 +164,10 @@ do_delete_route(Topic, Dest) ->
     Route = #route{topic = Topic, dest = Dest},
     case emqx_topic:wildcard(Topic) of
         true  ->
-            maybe_trans(fun delete_trie_route/1, [Route]);
-        false -> delete_direct_route(Route)
+            Fun = fun emqx_router_utils:delete_trie_route/2,
+            emqx_router_utils:maybe_trans(Fun, [?ROUTE_TAB, Route], ?ROUTE_SHARD);
+        false ->
+            emqx_router_utils:delete_direct_route(?ROUTE_TAB, Route)
     end.
 
 -spec(topics() -> list(emqx_types:topic())).
@@ -216,100 +220,3 @@ terminate(_Reason, #{pool := Pool, id := Id}) ->
 
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
-
-%%--------------------------------------------------------------------
-%% Internal functions
-%%--------------------------------------------------------------------
-
-insert_direct_route(Route) ->
-    mria:dirty_write(?ROUTE_TAB, Route).
-
-insert_trie_route(Route = #route{topic = Topic}) ->
-    case mnesia:wread({?ROUTE_TAB, Topic}) of
-        [] -> emqx_trie:insert(Topic);
-        _  -> ok
-    end,
-    mnesia:write(?ROUTE_TAB, Route, sticky_write).
-
-delete_direct_route(Route) ->
-    mria:dirty_delete_object(?ROUTE_TAB, Route).
-
-delete_trie_route(Route = #route{topic = Topic}) ->
-    case mnesia:wread({?ROUTE_TAB, Topic}) of
-        [Route] -> %% Remove route and trie
-                  ok = mnesia:delete_object(?ROUTE_TAB, Route, sticky_write),
-                   emqx_trie:delete(Topic);
-        [_|_]   -> %% Remove route only
-                   mnesia:delete_object(?ROUTE_TAB, Route, sticky_write);
-        []      -> ok
-    end.
-
-%% @private
--spec(maybe_trans(function(), list(any())) -> ok | {error, term()}).
-maybe_trans(Fun, Args) ->
-    case emqx:get_config([broker, perf, route_lock_type]) of
-        key ->
-            trans(Fun, Args);
-        global ->
-            %% Assert:
-            mnesia = mria_rlog:backend(), %% TODO: do something smarter than just crash
-            lock_router(),
-            try mnesia:sync_dirty(Fun, Args)
-            after
-                unlock_router()
-            end;
-        tab ->
-            trans(fun() ->
-                          emqx_trie:lock_tables(),
-                          apply(Fun, Args)
-                  end, [])
-    end.
-
-%% The created fun only terminates with explicit exception
--dialyzer({nowarn_function, [trans/2]}).
-
--spec(trans(function(), list(any())) -> ok | {error, term()}).
-trans(Fun, Args) ->
-    {WPid, RefMon} =
-        spawn_monitor(
-            %% NOTE: this is under the assumption that crashes in Fun
-            %% are caught by mria:transaction/2.
-            %% Future changes should keep in mind that this process
-            %% always exit with database write result.
-            fun() ->
-                    Res = case mria:transaction(?ROUTE_SHARD, Fun, Args) of
-                              {atomic, Ok} -> Ok;
-                              {aborted, Reason} -> {error, Reason}
-                          end,
-                    exit({shutdown, Res})
-            end),
-    %% Receive a 'shutdown' exit to pass result from the short-lived process.
-    %% so the receive below can be receive-mark optimized by the compiler.
-    %%
-    %% If the result is sent as a regular message, we'll have to
-    %% either demonitor (with flush which is essentially a 'receive' since
-    %% the process is no longer alive after the result has been received),
-    %% or use a plain 'receive' to drain the normal 'DOWN' message.
-    %% However the compiler does not optimize this second 'receive'.
-    receive
-        {'DOWN', RefMon, process, WPid, Info} ->
-            case Info of
-                {shutdown, Result} -> Result;
-                _ -> {error, {trans_crash, Info}}
-            end
-    end.
-
-lock_router() ->
-    %% if Retry is not 0, global:set_lock could sleep a random time up to 8s.
-    %% Considering we have a limited number of brokers, it is safe to use sleep 1 ms.
-    case global:set_lock({?MODULE, self()}, [node() | nodes()], 0) of
-        false ->
-            %% Force to sleep 1ms instead.
-            timer:sleep(1),
-            lock_router();
-        true ->
-            ok
-    end.
-
-unlock_router() ->
-    global:del_lock({?MODULE, self()}).

+ 126 - 0
apps/emqx/src/emqx_router_utils.erl

@@ -0,0 +1,126 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_router_utils).
+
+-include("emqx.hrl").
+
+-export([ delete_direct_route/2
+        , delete_trie_route/2
+        , insert_direct_route/2
+        , insert_trie_route/2
+        , maybe_trans/3
+        ]).
+
+insert_direct_route(Tab, Route) ->
+    mria:dirty_write(Tab, Route).
+
+insert_trie_route(RouteTab, Route = #route{topic = Topic}) ->
+    case mnesia:wread({RouteTab, Topic}) of
+        [] when RouteTab =:= emqx_route         -> emqx_trie:insert(Topic);
+        [] when RouteTab =:= emqx_session_route -> emqx_trie:insert_session(Topic);
+        _  -> ok
+    end,
+    mnesia:write(RouteTab, Route, sticky_write).
+
+delete_direct_route(RouteTab, Route) ->
+    mria:dirty_delete_object(RouteTab, Route).
+
+delete_trie_route(RouteTab, Route = #route{topic = Topic}) ->
+    case mnesia:wread({RouteTab, Topic}) of
+        [R] when R =:= Route ->
+            %% Remove route and trie
+            ok = mnesia:delete_object(RouteTab, Route, sticky_write),
+            case RouteTab of
+                emqx_route         -> emqx_trie:delete(Topic);
+                emqx_session_route -> emqx_trie:delete_session(Topic)
+            end;
+        [_|_]   ->
+            %% Remove route only
+            mnesia:delete_object(RouteTab, Route, sticky_write);
+        [] ->
+            ok
+    end.
+
+%% @private
+-spec(maybe_trans(function(), list(any()), Shard :: atom()) -> ok | {error, term()}).
+maybe_trans(Fun, Args, Shard) ->
+    case emqx:get_config([broker, perf, route_lock_type]) of
+        key ->
+            trans(Fun, Args, Shard);
+        global ->
+            %% Assert:
+            mnesia = mria_rlog:backend(), %% TODO: do something smarter than just crash
+            lock_router(Shard),
+            try mnesia:sync_dirty(Fun, Args)
+            after
+                unlock_router(Shard)
+            end;
+        tab ->
+            trans(fun() ->
+                          emqx_trie:lock_tables(),
+                          apply(Fun, Args)
+                  end, [], Shard)
+    end.
+
+%% The created fun only terminates with explicit exception
+-dialyzer({nowarn_function, [trans/3]}).
+
+-spec(trans(function(), list(any()), atom()) -> ok | {error, term()}).
+trans(Fun, Args, Shard) ->
+    {WPid, RefMon} =
+        spawn_monitor(
+            %% NOTE: this is under the assumption that crashes in Fun
+            %% are caught by mnesia:transaction/2.
+            %% Future changes should keep in mind that this process
+            %% always exit with database write result.
+            fun() ->
+                    Res = case mria:transaction(Shard, Fun, Args) of
+                              {atomic, Ok} -> Ok;
+                              {aborted, Reason} -> {error, Reason}
+                          end,
+                    exit({shutdown, Res})
+            end),
+    %% Receive a 'shutdown' exit to pass result from the short-lived process.
+    %% so the receive below can be receive-mark optimized by the compiler.
+    %%
+    %% If the result is sent as a regular message, we'll have to
+    %% either demonitor (with flush which is essentially a 'receive' since
+    %% the process is no longer alive after the result has been received),
+    %% or use a plain 'receive' to drain the normal 'DOWN' message.
+    %% However the compiler does not optimize this second 'receive'.
+    receive
+        {'DOWN', RefMon, process, WPid, Info} ->
+            case Info of
+                {shutdown, Result} -> Result;
+                _ -> {error, {trans_crash, Info}}
+            end
+    end.
+
+lock_router(Shard) ->
+    %% if Retry is not 0, global:set_lock could sleep a random time up to 8s.
+    %% Considering we have a limited number of brokers, it is safe to use sleep 1 ms.
+    case global:set_lock({{?MODULE, Shard}, self()}, [node() | nodes()], 0) of
+        false ->
+            %% Force to sleep 1ms instead.
+            timer:sleep(1),
+            lock_router(Shard);
+        true ->
+            ok
+    end.
+
+unlock_router(Shard) ->
+    global:del_lock({{?MODULE, Shard}, self()}).

+ 22 - 0
apps/emqx/src/emqx_schema.erl

@@ -150,8 +150,30 @@ roots(low) ->
    , {"flapping_detect",
        sc(ref("flapping_detect"),
           #{})}
+   , {"persistent_session_store",
+       sc(ref("persistent_session_store"),
+          #{})}
     ].
 
+fields("persistent_session_store") ->
+    [ {"enabled",
+       sc(boolean(),
+          #{ default => "false"
+           })},
+      {"max_retain_undelivered",
+       sc(duration(),
+          #{ default => "1h"
+           })},
+      {"message_gc_interval",
+       sc(duration(),
+          #{ default => "1h"
+           })},
+      {"session_message_gc_interval",
+       sc(duration(),
+          #{ default => "1m"
+           })}
+    ];
+
 fields("stats") ->
     [ {"enable",
        sc(boolean(),

+ 42 - 4
apps/emqx/src/emqx_session.erl

@@ -75,6 +75,7 @@
 -export([ deliver/2
         , enqueue/2
         , dequeue/1
+        , ignore_local/3
         , retry/1
         , terminate/3
         ]).
@@ -89,9 +90,17 @@
 %% Export for CT
 -export([set_field/3]).
 
--export_type([session/0]).
+-type sessionID() :: emqx_guid:guid().
+
+-export_type([ session/0
+             , sessionID/0
+             ]).
 
 -record(session, {
+          %% sessionID, fresh for all new sessions unless it is a resumed persistent session
+          id :: sessionID(),
+          %% Is this session a persistent session i.e. was it started with Session-Expiry > 0
+          is_persistent :: boolean(),
           %% Client’s Subscriptions.
           subscriptions :: map(),
           %% Max subscriptions allowed
@@ -129,7 +138,9 @@
 
 -type(replies() :: list(publish() | pubrel())).
 
--define(INFO_KEYS, [subscriptions,
+-define(INFO_KEYS, [id,
+                    is_persistent,
+                    subscriptions,
                     upgrade_qos,
                     retry_interval,
                     await_rel_timeout,
@@ -157,6 +168,7 @@
                     , await_rel_timeout => timeout()
                     , max_inflight => integer()
                     , mqueue => emqx_mqueue:options()
+                    , is_persistent => boolean()
                     }.
 
 %%--------------------------------------------------------------------
@@ -171,6 +183,8 @@ init(Opts) ->
                     store_qos0 => true
                    }, maps:get(mqueue, Opts, #{})),
     #session{
+       id                = emqx_guid:gen(),
+       is_persistent     = maps:get(is_persistent, Opts, false),
        max_subscriptions = maps:get(max_subscriptions, Opts, infinity),
        subscriptions     = #{},
        upgrade_qos       = maps:get(upgrade_qos, Opts, false),
@@ -195,6 +209,10 @@ info(Session) ->
 
 info(Keys, Session) when is_list(Keys) ->
     [{Key, info(Key, Session)} || Key <- Keys];
+info(id, #session{id = Id}) ->
+    Id;
+info(is_persistent, #session{is_persistent = Bool}) ->
+    Bool;
 info(subscriptions, #session{subscriptions = Subs}) ->
     Subs;
 info(subscriptions_cnt, #session{subscriptions = Subs}) ->
@@ -236,6 +254,23 @@ info(created_at, #session{created_at = CreatedAt}) ->
 -spec(stats(session()) -> emqx_types:stats()).
 stats(Session) -> info(?STATS_KEYS, Session).
 
+%%--------------------------------------------------------------------
+%% Ignore local messages
+%%--------------------------------------------------------------------
+
+ignore_local(Delivers, Subscriber, Session) ->
+    Subs = info(subscriptions, Session),
+    lists:dropwhile(fun({deliver, Topic, #message{from = Publisher}}) ->
+                        case maps:find(Topic, Subs) of
+                            {ok, #{nl := 1}} when Subscriber =:= Publisher ->
+                                ok = emqx_metrics:inc('delivery.dropped'),
+                                ok = emqx_metrics:inc('delivery.dropped.no_local'),
+                                true;
+                            _ ->
+                                false
+                        end
+                    end, Delivers).
+
 %%--------------------------------------------------------------------
 %% Client -> Broker: SUBSCRIBE
 %%--------------------------------------------------------------------
@@ -244,11 +279,12 @@ stats(Session) -> info(?STATS_KEYS, Session).
                 emqx_types:subopts(), session())
       -> {ok, session()} | {error, emqx_types:reason_code()}).
 subscribe(ClientInfo = #{clientid := ClientId}, TopicFilter, SubOpts,
-          Session = #session{subscriptions = Subs}) ->
+          Session = #session{id = SessionID, is_persistent = IsPS, subscriptions = Subs}) ->
     IsNew = not maps:is_key(TopicFilter, Subs),
     case IsNew andalso is_subscriptions_full(Session) of
         false ->
             ok = emqx_broker:subscribe(TopicFilter, ClientId, SubOpts),
+            ok = emqx_persistent_session:add_subscription(TopicFilter, SessionID, IsPS),
             ok = emqx_hooks:run('session.subscribed',
                                 [ClientInfo, TopicFilter, SubOpts#{is_new => IsNew}]),
             {ok, Session#session{subscriptions = maps:put(TopicFilter, SubOpts, Subs)}};
@@ -268,10 +304,12 @@ is_subscriptions_full(#session{subscriptions = Subs,
 
 -spec(unsubscribe(emqx_types:clientinfo(), emqx_types:topic(), emqx_types:subopts(), session())
       -> {ok, session()} | {error, emqx_types:reason_code()}).
-unsubscribe(ClientInfo, TopicFilter, UnSubOpts, Session = #session{subscriptions = Subs}) ->
+unsubscribe(ClientInfo, TopicFilter, UnSubOpts,
+            Session = #session{id = SessionID, subscriptions = Subs, is_persistent = IsPS}) ->
     case maps:find(TopicFilter, Subs) of
         {ok, SubOpts} ->
             ok = emqx_broker:unsubscribe(TopicFilter),
+            ok = emqx_persistent_session:remove_subscription(TopicFilter, SessionID, IsPS),
             ok = emqx_hooks:run('session.unsubscribed', [ClientInfo, TopicFilter, maps:merge(SubOpts, UnSubOpts)]),
             {ok, Session#session{subscriptions = maps:remove(TopicFilter, Subs)}};
         error ->

+ 276 - 0
apps/emqx/src/emqx_session_router.erl

@@ -0,0 +1,276 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_session_router).
+
+-behaviour(gen_server).
+
+-include("emqx.hrl").
+-include("logger.hrl").
+-include("types.hrl").
+
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+%% Mnesia bootstrap
+-export([mnesia/1]).
+
+-boot_mnesia({mnesia, [boot]}).
+
+-export([ create_init_tab/0
+        , start_link/2]).
+
+%% Route APIs
+-export([ delete_routes/2
+        , do_add_route/2
+        , do_delete_route/2
+        , match_routes/1
+        ]).
+
+-export([ buffer/3
+        , pending/2
+        , resume_begin/2
+        , resume_end/2
+        ]).
+
+-export([print_routes/1]).
+
+%% gen_server callbacks
+-export([ init/1
+        , handle_call/3
+        , handle_cast/2
+        , handle_info/2
+        , terminate/2
+        , code_change/3
+        ]).
+
+-type(group() :: binary()).
+
+-type(dest() :: node() | {group(), node()}).
+
+-define(ROUTE_TAB, emqx_session_route).
+
+-define(SESSION_INIT_TAB, session_init_tab).
+
+%%--------------------------------------------------------------------
+%% Mnesia bootstrap
+%%--------------------------------------------------------------------
+
+mnesia(boot) ->
+    ok = mria:create_table(?ROUTE_TAB, [
+                {type, bag},
+                {rlog_shard, ?ROUTE_SHARD},
+                {storage, disc_copies},
+                {record_name, route},
+                {attributes, record_info(fields, route)},
+                {storage_properties, [{ets, [{read_concurrency, true},
+                                             {write_concurrency, true}]}]}]).
+
+%%--------------------------------------------------------------------
+%% Start a router
+%%--------------------------------------------------------------------
+
+create_init_tab() ->
+    emqx_tables:new(?SESSION_INIT_TAB, [public, {read_concurrency, true},
+                                        {write_concurrency, true}]).
+
+-spec(start_link(atom(), pos_integer()) -> startlink_ret()).
+start_link(Pool, Id) ->
+    gen_server:start_link({local, emqx_misc:proc_name(?MODULE, Id)},
+                          ?MODULE, [Pool, Id], [{hibernate_after, 1000}]).
+
+%%--------------------------------------------------------------------
+%% Route APIs
+%%--------------------------------------------------------------------
+
+-spec(do_add_route(emqx_topic:topic(), dest()) -> ok | {error, term()}).
+do_add_route(Topic, SessionID) when is_binary(Topic) ->
+    Route = #route{topic = Topic, dest = SessionID},
+    case lists:member(Route, lookup_routes(Topic)) of
+        true  -> ok;
+        false ->
+            case emqx_topic:wildcard(Topic) of
+                true  ->
+                    Fun = fun emqx_router_utils:insert_trie_route/2,
+                    emqx_router_utils:maybe_trans(Fun, [?ROUTE_TAB, Route],
+                                                  ?PERSISTENT_SESSION_SHARD);
+                false ->
+                    emqx_router_utils:insert_direct_route(?ROUTE_TAB, Route)
+            end
+    end.
+
+%% @doc Match routes
+-spec(match_routes(emqx_topic:topic()) -> [emqx_types:route()]).
+match_routes(Topic) when is_binary(Topic) ->
+    case match_trie(Topic) of
+        [] -> lookup_routes(Topic);
+        Matched ->
+            lists:append([lookup_routes(To) || To <- [Topic | Matched]])
+    end.
+
+%% Optimize: routing table will be replicated to all router nodes.
+match_trie(Topic) ->
+    case emqx_trie:empty_session() of
+        true -> [];
+        false -> emqx_trie:match_session(Topic)
+    end.
+
+%% Async
+delete_routes(SessionID, Subscriptions) ->
+    cast(pick(SessionID), {delete_routes, SessionID, Subscriptions}).
+
+-spec(do_delete_route(emqx_topic:topic(), dest()) -> ok | {error, term()}).
+do_delete_route(Topic, SessionID) ->
+    Route = #route{topic = Topic, dest = SessionID},
+    case emqx_topic:wildcard(Topic) of
+        true  ->
+            Fun = fun emqx_router_utils:delete_trie_route/2,
+            emqx_router_utils:maybe_trans(Fun, [?ROUTE_TAB, Route], ?PERSISTENT_SESSION_SHARD);
+        false ->
+            emqx_router_utils:delete_direct_route(?ROUTE_TAB, Route)
+    end.
+
+%% @doc Print routes to a topic
+-spec(print_routes(emqx_topic:topic()) -> ok).
+print_routes(Topic) ->
+    lists:foreach(fun(#route{topic = To, dest = SessionID}) ->
+                      io:format("~s -> ~p~n", [To, SessionID])
+                  end, match_routes(Topic)).
+
+%%--------------------------------------------------------------------
+%% Session APIs
+%%--------------------------------------------------------------------
+
+pending(SessionID, MarkerIDs) ->
+    call(pick(SessionID), {pending, SessionID, MarkerIDs}).
+
+buffer(SessionID, STopic, Msg) ->
+    case emqx_tables:lookup_value(?SESSION_INIT_TAB, SessionID) of
+        undefined -> ok;
+        Worker -> emqx_session_router_worker:buffer(Worker, STopic, Msg)
+    end.
+
+-spec resume_begin(pid(), binary()) -> [{node(), emqx_guid:guid()}].
+resume_begin(From, SessionID) when is_pid(From), is_binary(SessionID) ->
+    call(pick(SessionID), {resume_begin, From, SessionID}).
+
+-spec resume_end(pid(), binary()) ->
+          {'ok', [emqx_types:message()]} | {'error', term()}.
+resume_end(From, SessionID) when is_pid(From), is_binary(SessionID) ->
+    case emqx_tables:lookup_value(?SESSION_INIT_TAB, SessionID) of
+        undefined ->
+            ?tp(ps_session_not_found, #{ sid => SessionID }),
+            {error, not_found};
+        Pid ->
+            Res = emqx_session_router_worker:resume_end(From, Pid, SessionID),
+            cast(pick(SessionID), {resume_end, SessionID, Pid}),
+            Res
+    end.
+
+%%--------------------------------------------------------------------
+%% Worker internals
+%%--------------------------------------------------------------------
+
+call(Router, Msg) ->
+    gen_server:call(Router, Msg, infinity).
+
+cast(Router, Msg) ->
+    gen_server:cast(Router, Msg).
+
+pick(#route{dest = SessionID}) ->
+    gproc_pool:pick_worker(session_router_pool, SessionID);
+pick(SessionID) when is_binary(SessionID) ->
+    gproc_pool:pick_worker(session_router_pool, SessionID).
+
+%%--------------------------------------------------------------------
+%% gen_server callbacks
+%%--------------------------------------------------------------------
+
+init([Pool, Id]) ->
+    true = gproc_pool:connect_worker(Pool, {Pool, Id}),
+    {ok, #{pool => Pool, id => Id, pmon => emqx_pmon:new()}}.
+
+handle_call({resume_begin, RemotePid, SessionID}, _From, State) ->
+    case init_resume_worker(RemotePid, SessionID, State) of
+        error ->
+            {reply, error, State};
+        {ok, Pid, State1} ->
+            ets:insert(?SESSION_INIT_TAB, {SessionID, Pid}),
+            MarkerID = emqx_persistent_session:mark_resume_begin(SessionID),
+            {reply, {ok, MarkerID}, State1}
+    end;
+handle_call({pending, SessionID, MarkerIDs}, _From, State) ->
+    Res = emqx_persistent_session:pending_messages_in_db(SessionID, MarkerIDs),
+    {reply, Res, State};
+handle_call(Req, _From, State) ->
+    ?LOG(error, "Unexpected call: ~p", [Req]),
+    {reply, ignored, State}.
+
+handle_cast({delete_routes, SessionID, Subscriptions}, State) ->
+    %% TODO: Make a batch for deleting all routes.
+    Fun = fun({Topic, _}) -> do_delete_route(Topic, SessionID) end,
+    ok = lists:foreach(Fun, maps:to_list(Subscriptions)),
+    {noreply, State};
+handle_cast({resume_end, SessionID, Pid}, State) ->
+    case emqx_tables:lookup_value(?SESSION_INIT_TAB, SessionID) of
+        undefined -> skip;
+        P when P =:= Pid -> ets:delete(?SESSION_INIT_TAB, SessionID);
+        P when is_pid(P) -> skip
+    end,
+    Pmon = emqx_pmon:demonitor(Pid, maps:get(pmon, State)),
+    _ = emqx_session_router_worker_sup:abort_worker(Pid),
+    {noreply, State#{ pmon => Pmon }};
+handle_cast(Msg, State) ->
+    ?LOG(error, "Unexpected cast: ~p", [Msg]),
+    {noreply, State}.
+
+handle_info(Info, State) ->
+    ?LOG(error, "Unexpected info: ~p", [Info]),
+    {noreply, State}.
+
+terminate(_Reason, #{pool := Pool, id := Id}) ->
+    gproc_pool:disconnect_worker(Pool, {Pool, Id}).
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+%%--------------------------------------------------------------------
+%% Resume worker. A process that buffers the persisted messages during
+%% initialisation of a resuming session.
+%%--------------------------------------------------------------------
+
+init_resume_worker(RemotePid, SessionID, #{ pmon := Pmon } = State) ->
+    case emqx_session_router_worker_sup:start_worker(SessionID, RemotePid) of
+        {error, What} ->
+            ?SLOG(error, #{msg => "Could not start resume worker", reason => What}),
+            error;
+        {ok, Pid} ->
+            Pmon1 = emqx_pmon:monitor(Pid, Pmon),
+            case emqx_tables:lookup_value(?SESSION_INIT_TAB, SessionID) of
+                undefined ->
+                    {ok, Pid, State#{ pmon => Pmon1 }};
+                {_, OldPid} ->
+                    Pmon2 = emqx_pmon:demonitor(OldPid, Pmon1),
+                    _ = emqx_session_router_worker_sup:abort_worker(OldPid),
+                    {ok, Pid, State#{ pmon => Pmon2 }}
+            end
+    end.
+
+%%--------------------------------------------------------------------
+%% Internal functions
+%%--------------------------------------------------------------------
+
+lookup_routes(Topic) ->
+    ets:lookup(?ROUTE_TAB, Topic).

+ 148 - 0
apps/emqx/src/emqx_session_router_worker.erl

@@ -0,0 +1,148 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+%% @doc The session router worker is responsible for buffering
+%% messages for a persistent session while it is initializing.  If a
+%% connection process exists for a persistent session, this process is
+%% used for bridging the gap while the new connection process takes
+%% over the persistent session, but if there is no such process this
+%% worker takes it place.
+%%
+%% The workers are started on all nodes, and buffers all messages that
+%% are persisted to the session message table. In the final stage of
+%% the initialization, the messages are delivered and the worker is
+%% terminated.
+
+
+-module(emqx_session_router_worker).
+
+-behaviour(gen_server).
+
+%% API
+-export([ buffer/3
+        , pendings/1
+        , resume_end/3
+        , start_link/2
+        ]).
+
+%% gen_server callbacks
+-export([ init/1
+        , handle_call/3
+        , handle_cast/2
+        , handle_info/2
+        , terminate/2
+        ]).
+
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+-record(state, { remote_pid :: pid()
+               , session_id :: binary()
+               , session_tab :: ets:table()
+               , messages :: ets:table()
+               , buffering :: boolean()
+               }).
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+
+start_link(SessionTab, #{} = Opts) ->
+    gen_server:start_link(?MODULE, Opts#{ session_tab => SessionTab}, []).
+
+pendings(Pid) ->
+    gen_server:call(Pid, pendings).
+
+resume_end(RemotePid, Pid, _SessionID) ->
+    case gen_server:call(Pid, {resume_end, RemotePid}) of
+        {ok, EtsHandle} ->
+            ?tp(ps_worker_call_ok, #{ pid => Pid
+                                    , remote_pid => RemotePid
+                                    , sid => _SessionID}),
+            {ok, ets:tab2list(EtsHandle)};
+        {error, _} = Err ->
+            ?tp(ps_worker_call_failed, #{ pid => Pid
+                                        , remote_pid => RemotePid
+                                        , sid => _SessionID
+                                        , reason => Err}),
+            Err
+    end.
+
+buffer(Worker, STopic, Msg) ->
+    Worker ! {buffer, STopic, Msg},
+    ok.
+
+%%%===================================================================
+%%% gen_server callbacks
+%%%===================================================================
+
+init(#{ remote_pid  := RemotePid
+      , session_id  := SessionID
+      , session_tab := SessionTab}) ->
+    process_flag(trap_exit, true),
+    erlang:monitor(process, RemotePid),
+    ?tp(ps_worker_started, #{ remote_pid => RemotePid
+                            , sid => SessionID }),
+    {ok, #state{ remote_pid = RemotePid
+               , session_id = SessionID
+               , session_tab = SessionTab
+               , messages = ets:new(?MODULE, [protected, ordered_set])
+               , buffering = true
+               }}.
+
+handle_call(pendings, _From, State) ->
+    %% Debug API
+    {reply, {State#state.messages, State#state.remote_pid}, State};
+handle_call({resume_end, RemotePid}, _From, #state{remote_pid = RemotePid} = State) ->
+    ?tp(ps_worker_resume_end, #{sid => State#state.session_id}),
+    {reply, {ok, State#state.messages}, State#state{ buffering = false }};
+handle_call({resume_end, _RemotePid}, _From, State) ->
+    ?tp(ps_worker_resume_end_error, #{sid => State#state.session_id}),
+    {reply, {error, wrong_remote_pid}, State};
+handle_call(_Request, _From, State) ->
+    Reply = ok,
+    {reply, Reply, State}.
+
+handle_cast(_Request, State) ->
+    {noreply, State}.
+
+handle_info({buffer, _STopic, _Msg}, State) when not State#state.buffering ->
+    ?tp(ps_worker_drop_deliver, #{ sid => State#state.session_id
+                                 , msg_id => emqx_message:id(_Msg)
+                                 }),
+    {noreply, State};
+handle_info({buffer, STopic, Msg}, State) when State#state.buffering ->
+    ?tp(ps_worker_deliver, #{ sid => State#state.session_id
+                            , msg_id => emqx_message:id(Msg)
+                            }),
+    ets:insert(State#state.messages, {{Msg, STopic}}),
+    {noreply, State};
+handle_info({'DOWN', _, process, RemotePid, _Reason}, #state{remote_pid = RemotePid} = State) ->
+    ?tp(warning, ps_worker, #{ event => worker_remote_died
+                             , sid => State#state.session_id
+                             , msg => "Remote pid died. Exiting." }),
+    {stop, normal, State};
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+terminate(shutdown, _State) ->
+    ?tp(ps_worker_shutdown, #{ sid => _State#state.session_id }),
+    ok;
+terminate(_, _State) ->
+    ok.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================

+ 57 - 0
apps/emqx/src/emqx_session_router_worker_sup.erl

@@ -0,0 +1,57 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_session_router_worker_sup).
+
+-behaviour(supervisor).
+
+-export([ start_link/1
+        ]).
+
+-export([ abort_worker/1
+        , start_worker/2
+        ]).
+
+-export([ init/1
+        ]).
+
+start_link(SessionTab) ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, SessionTab).
+
+start_worker(SessionID, RemotePid) ->
+    supervisor:start_child(?MODULE, [#{ session_id => SessionID
+                                      , remote_pid => RemotePid}]).
+
+abort_worker(Pid) ->
+    supervisor:terminate_child(?MODULE, Pid).
+
+%%--------------------------------------------------------------------
+%% Supervisor callbacks
+%%--------------------------------------------------------------------
+
+init(SessionTab) ->
+    %% Resume worker
+    Worker = #{id => session_router_worker,
+               start => {emqx_session_router_worker, start_link, [SessionTab]},
+               restart => transient,
+               shutdown => 2000,
+               type => worker,
+               modules => [emqx_session_router_worker]},
+    Spec = #{ strategy  => simple_one_for_one
+            , intensity => 1
+            , period    => 5},
+
+    {ok, {Spec, [Worker]}}.

+ 2 - 0
apps/emqx/src/emqx_sup.erl

@@ -65,9 +65,11 @@ init([]) ->
     KernelSup = child_spec(emqx_kernel_sup, supervisor),
     RouterSup = child_spec(emqx_router_sup, supervisor),
     BrokerSup = child_spec(emqx_broker_sup, supervisor),
+    SessionSup = child_spec(emqx_persistent_session_sup, supervisor),
     CMSup = child_spec(emqx_cm_sup, supervisor),
     SysSup = child_spec(emqx_sys_sup, supervisor),
     Children = [KernelSup] ++
+               [SessionSup || emqx_persistent_session:is_store_enabled()] ++
                [RouterSup || emqx_boot:is_enabled(router)] ++
                [BrokerSup || emqx_boot:is_enabled(broker)] ++
                [CMSup || emqx_boot:is_enabled(broker)] ++

+ 98 - 48
apps/emqx/src/emqx_trie.erl

@@ -19,18 +19,25 @@
 -include("emqx.hrl").
 
 %% Mnesia bootstrap
--export([mnesia/1]).
+-export([ mnesia/1
+        , create_session_trie/0
+        ]).
 
 -boot_mnesia({mnesia, [boot]}).
 
 %% Trie APIs
 -export([ insert/1
+        , insert_session/1
         , match/1
+        , match_session/1
         , delete/1
+        , delete_session/1
         ]).
 
 -export([ empty/0
+        , empty_session/0
         , lock_tables/0
+        , lock_session_tables/0
         ]).
 
 -export([is_compact/0, set_compact/1]).
@@ -41,6 +48,7 @@
 -endif.
 
 -define(TRIE, emqx_trie).
+-define(SESSION_TRIE, emqx_session_trie).
 -define(PREFIX(Prefix), {Prefix, 0}).
 -define(TOPIC(Topic), {Topic, 1}).
 
@@ -62,12 +70,23 @@ mnesia(boot) ->
                         ]}],
     ok = mria:create_table(?TRIE, [
                 {rlog_shard, ?ROUTE_SHARD},
-                {storage, ram_copies},
                 {record_name, ?TRIE},
                 {attributes, record_info(fields, ?TRIE)},
                 {type, ordered_set},
                 {storage_properties, StoreProps}]).
 
+create_session_trie() ->
+    StoreProps = [{ets, [{read_concurrency, true},
+                         {write_concurrency, true}
+                        ]}],
+    ok = mria:create_table(?SESSION_TRIE,
+                           [{rlog_shard, ?ROUTE_SHARD},
+                            {storage, disc_copies},
+                            {record_name, ?TRIE},
+                            {attributes, record_info(fields, ?TRIE)},
+                            {type, ordered_set},
+                            {storage_properties, StoreProps}]).
+
 %%--------------------------------------------------------------------
 %% Topics APIs
 %%--------------------------------------------------------------------
@@ -75,24 +94,46 @@ mnesia(boot) ->
 %% @doc Insert a topic filter into the trie.
 -spec(insert(emqx_types:topic()) -> ok).
 insert(Topic) when is_binary(Topic) ->
+    insert(Topic, ?TRIE).
+
+-spec(insert_session(emqx_topic:topic()) -> ok).
+insert_session(Topic) when is_binary(Topic) ->
+    insert(Topic, ?SESSION_TRIE).
+
+insert(Topic, Trie) when is_binary(Topic) ->
     {TopicKey, PrefixKeys} = make_keys(Topic),
-    case mnesia:wread({?TRIE, TopicKey}) of
+    case mnesia:wread({Trie, TopicKey}) of
         [_] -> ok; %% already inserted
-        [] -> lists:foreach(fun insert_key/1, [TopicKey | PrefixKeys])
+        [] -> lists:foreach(fun(Key) -> insert_key(Key, Trie) end, [TopicKey | PrefixKeys])
     end.
 
 %% @doc Delete a topic filter from the trie.
 -spec(delete(emqx_types:topic()) -> ok).
 delete(Topic) when is_binary(Topic) ->
+    delete(Topic, ?TRIE).
+
+%% @doc Delete a topic filter from the trie.
+-spec(delete_session(emqx_topic:topic()) -> ok).
+delete_session(Topic) when is_binary(Topic) ->
+    delete(Topic, ?SESSION_TRIE).
+
+delete(Topic, Trie) when is_binary(Topic) ->
     {TopicKey, PrefixKeys} = make_keys(Topic),
-    case [] =/= mnesia:wread({?TRIE, TopicKey}) of
-        true -> lists:foreach(fun delete_key/1, [TopicKey | PrefixKeys]);
+    case [] =/= mnesia:wread({Trie, TopicKey}) of
+        true -> lists:foreach(fun(Key) -> delete_key(Key, Trie) end, [TopicKey | PrefixKeys]);
         false -> ok
     end.
 
 %% @doc Find trie nodes that matchs the topic name.
 -spec(match(emqx_types:topic()) -> list(emqx_types:topic())).
 match(Topic) when is_binary(Topic) ->
+    match(Topic, ?TRIE).
+
+-spec(match_session(emqx_topic:topic()) -> list(emqx_topic:topic())).
+match_session(Topic) when is_binary(Topic) ->
+    match(Topic, ?SESSION_TRIE).
+
+match(Topic, Trie) when is_binary(Topic) ->
     Words = emqx_topic:words(Topic),
     case emqx_topic:wildcard(Words) of
         true ->
@@ -105,17 +146,26 @@ match(Topic) when is_binary(Topic) ->
             %% Such clients will get disconnected.
             [];
         false ->
-            do_match(Words)
+            do_match(Words, Trie)
     end.
 
 %% @doc Is the trie empty?
 -spec(empty() -> boolean()).
-empty() -> ets:first(?TRIE) =:= '$end_of_table'.
+empty() -> empty(?TRIE).
+
+empty_session() ->
+    empty(?SESSION_TRIE).
+
+empty(Trie) -> ets:first(Trie) =:= '$end_of_table'.
 
 -spec lock_tables() -> ok.
 lock_tables() ->
     mnesia:write_lock_table(?TRIE).
 
+-spec lock_session_tables() -> ok.
+lock_session_tables() ->
+    mnesia:write_lock_table(?SESSION_TRIE).
+
 %%--------------------------------------------------------------------
 %% Internal functions
 %%--------------------------------------------------------------------
@@ -163,70 +213,70 @@ make_prefixes([H | T], Prefix0, Acc0) ->
     Acc = [Prefix | Acc0],
     make_prefixes(T, Prefix, Acc).
 
-insert_key(Key) ->
-    T = case mnesia:wread({?TRIE, Key}) of
+insert_key(Key, Trie) ->
+    T = case mnesia:wread({Trie, Key}) of
             [#?TRIE{count = C} = T1] ->
                 T1#?TRIE{count = C + 1};
              [] ->
                 #?TRIE{key = Key, count = 1}
          end,
-    ok = mnesia:write(T).
+    ok = mnesia:write(Trie, T, write).
 
-delete_key(Key) ->
-    case mnesia:wread({?TRIE, Key}) of
+delete_key(Key, Trie) ->
+    case mnesia:wread({Trie, Key}) of
         [#?TRIE{count = C} = T] when C > 1 ->
-            ok = mnesia:write(T#?TRIE{count = C - 1});
+            ok = mnesia:write(Trie, T#?TRIE{count = C - 1}, write);
         [_] ->
-            ok = mnesia:delete(?TRIE, Key, write);
+            ok = mnesia:delete(Trie, Key, write);
         [] ->
             ok
     end.
 
 %% micro-optimization: no need to lookup when topic is not wildcard
 %% because we only insert wildcards to emqx_trie
-lookup_topic(_Topic, false) -> [];
-lookup_topic(Topic, true) -> lookup_topic(Topic).
+lookup_topic(_Topic,_Trie, false) -> [];
+lookup_topic(Topic, Trie, true) -> lookup_topic(Topic, Trie).
 
-lookup_topic(Topic) when is_binary(Topic) ->
-    case ets:lookup(?TRIE, ?TOPIC(Topic)) of
+lookup_topic(Topic, Trie) when is_binary(Topic) ->
+    case ets:lookup(Trie, ?TOPIC(Topic)) of
         [#?TRIE{count = C}] -> [Topic || C > 0];
         [] -> []
     end.
 
-has_prefix(empty) -> true; %% this is the virtual tree root
-has_prefix(Prefix) ->
-    case ets:lookup(?TRIE, ?PREFIX(Prefix)) of
+has_prefix(empty, _Trie) -> true; %% this is the virtual tree root
+has_prefix(Prefix, Trie) ->
+    case ets:lookup(Trie, ?PREFIX(Prefix)) of
         [#?TRIE{count = C}] -> C > 0;
         [] -> false
     end.
 
-do_match([<<"$", _/binary>> = Prefix | Words]) ->
+do_match([<<"$", _/binary>> = Prefix | Words], Trie) ->
     %% For topics having dollar sign prefix,
     %% we do not match root level + or #,
     %% fast forward to the next level.
     case Words =:= [] of
-        true -> lookup_topic(Prefix);
+        true -> lookup_topic(Prefix, Trie);
         false -> []
-    end ++ do_match(Words, Prefix);
-do_match(Words) ->
-    do_match(Words, empty).
+    end ++ do_match(Words, Prefix, Trie);
+do_match(Words, Trie) ->
+    do_match(Words, empty, Trie).
 
-do_match(Words, Prefix) ->
+do_match(Words, Prefix, Trie) ->
     case is_compact() of
-        true -> match_compact(Words, Prefix, false, []);
-        false -> match_no_compact(Words, Prefix, false, [])
+        true -> match_compact(Words, Prefix, Trie, false, []);
+        false -> match_no_compact(Words, Prefix, Trie, false, [])
     end.
 
-match_no_compact([], Topic, IsWildcard, Acc) ->
-    'match_#'(Topic) ++ %% try match foo/+/# or foo/bar/#
-    lookup_topic(Topic, IsWildcard) ++ %% e.g. foo/+
+match_no_compact([], Topic, Trie, IsWildcard, Acc) ->
+    'match_#'(Topic, Trie) ++ %% try match foo/+/# or foo/bar/#
+    lookup_topic(Topic, Trie, IsWildcard) ++ %% e.g. foo/+
     Acc;
-match_no_compact([Word | Words], Prefix, IsWildcard, Acc0) ->
-    case has_prefix(Prefix) of
+match_no_compact([Word | Words], Prefix, Trie, IsWildcard, Acc0) ->
+    case has_prefix(Prefix, Trie) of
         true ->
-            Acc1 = 'match_#'(Prefix) ++ Acc0,
-            Acc = match_no_compact(Words, join(Prefix, '+'), true, Acc1),
-            match_no_compact(Words, join(Prefix, Word), IsWildcard, Acc);
+            Acc1 = 'match_#'(Prefix, Trie) ++ Acc0,
+            Acc = match_no_compact(Words, join(Prefix, '+'), Trie, true, Acc1),
+            match_no_compact(Words, join(Prefix, Word), Trie, IsWildcard, Acc);
         false ->
             %% non-compact paths in database
             %% if there is no prefix matches the current topic prefix
@@ -243,26 +293,26 @@ match_no_compact([Word | Words], Prefix, IsWildcard, Acc0) ->
             Acc0
     end.
 
-match_compact([], Topic, IsWildcard, Acc) ->
-    'match_#'(Topic) ++ %% try match foo/bar/#
-    lookup_topic(Topic, IsWildcard) ++ %% try match foo/bar
+match_compact([], Topic, Trie, IsWildcard, Acc) ->
+    'match_#'(Topic, Trie) ++ %% try match foo/bar/#
+    lookup_topic(Topic, Trie, IsWildcard) ++ %% try match foo/bar
     Acc;
-match_compact([Word | Words], Prefix, IsWildcard, Acc0) ->
-    Acc1 = 'match_#'(Prefix) ++ Acc0,
-    Acc = match_compact(Words, join(Prefix, Word), IsWildcard, Acc1),
+match_compact([Word | Words], Prefix, Trie, IsWildcard, Acc0) ->
+    Acc1 = 'match_#'(Prefix, Trie) ++ Acc0,
+    Acc = match_compact(Words, join(Prefix, Word), Trie, IsWildcard, Acc1),
     WildcardPrefix = join(Prefix, '+'),
     %% go deeper to match current_prefix/+ only when:
     %% 1. current word is the last
     %% OR
     %% 2. there is a prefix = 'current_prefix/+'
-    case Words =:= [] orelse has_prefix(WildcardPrefix) of
-        true -> match_compact(Words, WildcardPrefix, true, Acc);
+    case Words =:= [] orelse has_prefix(WildcardPrefix, Trie) of
+        true -> match_compact(Words, WildcardPrefix, Trie, true, Acc);
         false -> Acc
     end.
 
-'match_#'(Prefix) ->
+'match_#'(Prefix, Trie) ->
     MlTopic = join(Prefix, '#'),
-    lookup_topic(MlTopic).
+    lookup_topic(MlTopic, Trie).
 
 is_compact() ->
     emqx:get_config([broker, perf, trie_compaction], true).

+ 4 - 2
apps/emqx/test/emqx_cm_SUITE.erl

@@ -98,6 +98,7 @@ t_open_session(_) ->
                  sockname => {{127,0,0,1}, 1883},
                  peercert => nossl,
                  conn_mod => emqx_connection,
+                 expiry_interval => 0,
                  receive_maximum => 100},
     {ok, #{session := Session1, present := false}}
         = emqx_cm:open_session(true, ClientInfo, ConnInfo),
@@ -123,6 +124,7 @@ t_open_session_race_condition(_) ->
                  sockname => {{127,0,0,1}, 1883},
                  peercert => nossl,
                  conn_mod => emqx_connection,
+                 expiry_interval => 0,
                  receive_maximum => 100},
 
     Parent = self(),
@@ -219,7 +221,7 @@ t_discard_session_race(_) ->
 
 t_takeover_session(_) ->
     #{conninfo := ConnInfo} = ?ChanInfo,
-    {error, not_found} = emqx_cm:takeover_session(<<"clientid">>),
+    none = emqx_cm:takeover_session(<<"clientid">>),
     erlang:spawn_link(fun() ->
         ok = emqx_cm:register_channel(<<"clientid">>, self(), ConnInfo),
         receive
@@ -228,7 +230,7 @@ t_takeover_session(_) ->
         end
     end),
     timer:sleep(100),
-    {ok, emqx_connection, _, test} = emqx_cm:takeover_session(<<"clientid">>),
+    {living, emqx_connection, _, test} = emqx_cm:takeover_session(<<"clientid">>),
     emqx_cm:unregister_channel(<<"clientid">>).
 
 t_kick_session(_) ->

+ 0 - 32
apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl

@@ -131,7 +131,6 @@ clean_retained(Topic, Config) ->
 t_basic_test(Config) ->
     ConnFun = ?config(conn_fun, Config),
     Topic = nth(1, ?TOPICS),
-    ct:print("Basic test starting"),
     {ok, C} = emqtt:start_link([{proto_ver, v5} | Config]),
     {ok, _} = emqtt:ConnFun(C),
     {ok, _, [1]} = emqtt:subscribe(C, Topic, qos1),
@@ -333,37 +332,6 @@ t_connect_keepalive_timeout(Config) ->
         error("keepalive timeout")
     end.
 
-%% [MQTT-3.1.2-23]
-t_connect_session_expiry_interval(Config) ->
-    ConnFun = ?config(conn_fun, Config),
-    Topic = nth(1, ?TOPICS),
-    Payload = "test message",
-
-    {ok, Client1} = emqtt:start_link([ {clientid, <<"t_connect_session_expiry_interval">>},
-                                       {proto_ver, v5},
-                                       {properties, #{'Session-Expiry-Interval' => 7200}}
-                                       | Config
-                                    ]),
-    {ok, _} = emqtt:ConnFun(Client1),
-    {ok, _, [2]} = emqtt:subscribe(Client1, Topic, qos2),
-    ok = emqtt:disconnect(Client1),
-
-    {ok, Client2} = emqtt:start_link([{proto_ver, v5} | Config]),
-    {ok, _} = emqtt:ConnFun(Client2),
-    {ok, 2} = emqtt:publish(Client2, Topic, Payload, 2),
-    ok = emqtt:disconnect(Client2),
-
-    {ok, Client3} = emqtt:start_link([ {clientid, <<"t_connect_session_expiry_interval">>},
-                                       {proto_ver, v5},
-                                       {clean_start, false} | Config
-                                    ]),
-    {ok, _} = emqtt:ConnFun(Client3),
-    [Msg | _ ] = receive_messages(1),
-    ?assertEqual({ok, iolist_to_binary(Topic)}, maps:find(topic, Msg)),
-    ?assertEqual({ok, iolist_to_binary(Payload)}, maps:find(payload, Msg)),
-    ?assertEqual({ok, 2}, maps:find(qos, Msg)),
-    ok = emqtt:disconnect(Client3).
-
 %% [MQTT-3.1.3-9]
 %% !!!REFACTOR NEED:
 %t_connect_will_delay_interval(Config) ->

Plik diff jest za duży
+ 1061 - 0
apps/emqx/test/emqx_persistent_session_SUITE.erl


+ 5 - 0
apps/emqx/test/emqx_proper_types.erl

@@ -100,6 +100,8 @@ clientinfo() ->
 %% See emqx_session:session() type define
 sessioninfo() ->
     ?LET(Session, {session,
+                    sessionid(),        % id
+                    boolean(),          % is_persistent
                     subscriptions(),    % subscriptions
                     non_neg_integer(),  % max_subscriptions
                     boolean(),          % upgrade_qos
@@ -114,6 +116,9 @@ sessioninfo() ->
                   },
          emqx_session:info(Session)).
 
+sessionid() ->
+    emqx_guid:gen().
+
 subscriptions() ->
     ?LET(L, list({topic(), subopts()}), maps:from_list(L)).
 

+ 1 - 1
apps/emqx/test/emqx_trie_SUITE.erl

@@ -183,7 +183,7 @@ t_delete3(_) ->
               ?TRIE:delete(<<"sensor/+/unknown">>)
           end),
     ?assertEqual([], ?TRIE:match(<<"sensor">>)),
-    ?assertEqual([], ?TRIE:lookup_topic(<<"sensor/+">>)).
+    ?assertEqual([], ?TRIE:lookup_topic(<<"sensor/+">>, ?TRIE)).
 
 clear_tables() -> emqx_trie:clear_tables().