Просмотр исходного кода

feat(ds): open iterators when handling `SUBSCRIBE` packets

Fixes https://emqx.atlassian.net/browse/EMQX-9741
Thales Macedo Garitezi 2 лет назад
Родитель
Сommit
9463e271c0

+ 1 - 2
apps/emqx/include/emqx.hrl

@@ -23,7 +23,6 @@
 -define(SHARED_SUB_SHARD, emqx_shared_sub_shard).
 -define(CM_SHARD, emqx_cm_shard).
 -define(ROUTE_SHARD, route_shard).
--define(PERSISTENT_SESSION_SHARD, emqx_persistent_session_shard).
 
 %% Banner
 %%--------------------------------------------------------------------
@@ -92,7 +91,7 @@
 
 -record(route, {
     topic :: binary(),
-    dest :: node() | {binary(), node()} | emqx_session:sessionID()
+    dest :: node() | {binary(), node()} | emqx_session:session_id()
 }).
 
 %%--------------------------------------------------------------------

+ 57 - 0
apps/emqx/include/emqx_session.hrl

@@ -0,0 +1,57 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2017-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-ifndef(EMQX_SESSION_HRL).
+-define(EMQX_SESSION_HRL, true).
+
+-record(session, {
+    %% Client's id
+    clientid :: emqx_types:clientid(),
+    id :: emqx_session:session_id(),
+    %% Is this session a persistent session i.e. was it started with Session-Expiry > 0
+    is_persistent :: boolean(),
+    %% Client’s Subscriptions.
+    subscriptions :: map(),
+    %% Max subscriptions allowed
+    max_subscriptions :: non_neg_integer() | infinity,
+    %% Upgrade QoS?
+    upgrade_qos :: boolean(),
+    %% Client <- Broker: QoS1/2 messages sent to the client but
+    %% have not been unacked.
+    inflight :: emqx_inflight:inflight(),
+    %% All QoS1/2 messages published to when client is disconnected,
+    %% or QoS1/2 messages pending transmission to the Client.
+    %%
+    %% Optionally, QoS0 messages pending transmission to the Client.
+    mqueue :: emqx_mqueue:mqueue(),
+    %% Next packet id of the session
+    next_pkt_id = 1 :: emqx_types:packet_id(),
+    %% Retry interval for redelivering QoS1/2 messages (Unit: millisecond)
+    retry_interval :: timeout(),
+    %% Client -> Broker: QoS2 messages received from the client, but
+    %% have not been completely acknowledged
+    awaiting_rel :: map(),
+    %% Maximum number of awaiting QoS2 messages allowed
+    max_awaiting_rel :: non_neg_integer() | infinity,
+    %% Awaiting PUBREL Timeout (Unit: millisecond)
+    await_rel_timeout :: timeout(),
+    %% Created at
+    created_at :: pos_integer(),
+    %% Durable storage iterators for existing subscriptions
+    iterators = [] :: [emqx_ds_replay:replay_id()]
+}).
+
+-endif.

+ 1 - 0
apps/emqx/priv/bpapi.versions

@@ -15,6 +15,7 @@
 {emqx_conf,3}.
 {emqx_dashboard,1}.
 {emqx_delayed,1}.
+{emqx_ds,1}.
 {emqx_eviction_agent,1}.
 {emqx_exhook,1}.
 {emqx_ft_storage_exporter_fs,1}.

+ 11 - 1
apps/emqx/src/emqx_cm.erl

@@ -21,6 +21,7 @@
 
 -include("emqx.hrl").
 -include("emqx_cm.hrl").
+-include("emqx_session.hrl").
 -include("logger.hrl").
 -include("types.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
@@ -301,7 +302,16 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
 
 create_session(ClientInfo, ConnInfo) ->
     Options = get_session_confs(ClientInfo, ConnInfo),
-    Session = emqx_session:init(Options),
+    #{clientid := ClientID} = ClientInfo,
+    Session0 = emqx_session:init(Options),
+    IteratorIDs =
+        case emqx_persistent_session_ds:open_session(ClientID) of
+            {skipped, disabled} ->
+                [];
+            {_IsNew, _DSSessionID, Iterators0} ->
+                Iterators0
+        end,
+    Session = Session0#session{iterators = IteratorIDs},
     ok = emqx_metrics:inc('session.created'),
     ok = emqx_hooks:run('session.created', [ClientInfo, emqx_session:info(Session)]),
     Session.

+ 48 - 1
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -16,15 +16,24 @@
 
 -module(emqx_persistent_session_ds).
 
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
 -export([init/0]).
 
--export([persist_message/1]).
+-export([
+    persist_message/1,
+    open_session/1,
+    add_subscription/2
+]).
 
 -export([
     serialize_message/1,
     deserialize_message/1
 ]).
 
+%% RPC
+-export([do_open_iterator/3]).
+
 %% FIXME
 -define(DS_SHARD, <<"local">>).
 
@@ -72,6 +81,44 @@ store_message(Msg) ->
 find_subscribers(_Msg) ->
     [node()].
 
+open_session(ClientID) ->
+    ?WHEN_ENABLED(emqx_ds:session_open(ClientID)).
+
+-spec add_subscription(emqx_types:topic(), emqx_ds:session_id()) ->
+    {ok, emqx_ds:iterator_id(), _IsNew :: boolean()} | {skipped, disabled}.
+add_subscription(TopicFilterBin, DSSessionID) ->
+    ?WHEN_ENABLED(
+        begin
+            TopicFilter = emqx_topic:words(TopicFilterBin),
+            {ok, IteratorID, StartMS, IsNew} = emqx_ds:session_add_iterator(
+                DSSessionID, TopicFilter
+            ),
+            case IsNew of
+                true ->
+                    ok = open_iterator_on_all_nodes(TopicFilter, StartMS, IteratorID);
+                false ->
+                    ok
+            end,
+            {ok, IteratorID, IsNew}
+        end
+    ).
+
+-spec open_iterator_on_all_nodes(emqx_topic:words(), emqx_ds:time(), emqx_ds:iterator_id()) -> ok.
+open_iterator_on_all_nodes(TopicFilter, StartMS, IteratorID) ->
+    Nodes = emqx:running_nodes(),
+    Results = emqx_ds_proto_v1:open_iterator(Nodes, TopicFilter, StartMS, IteratorID),
+    %% TODO: handle errors
+    true = lists:all(fun(Res) -> Res =:= {ok, ok} end, Results),
+    ok.
+
+-spec do_open_iterator(emqx_topic:words(), emqx_ds:time(), emqx_ds:iterator_id()) -> ok.
+do_open_iterator(TopicFilter, StartMS, IteratorID) ->
+    Replay = {TopicFilter, StartMS},
+    %% FIXME: choose DS shard based on ...?
+    {ok, It} = emqx_ds_storage_layer:make_iterator(?DS_SHARD, Replay),
+    ok = emqx_ds_storage_layer:preserve_iterator(It, IteratorID),
+    ok.
+
 %%
 
 serialize_message(Msg) ->

+ 22 - 40
apps/emqx/src/emqx_session.erl

@@ -44,6 +44,7 @@
 -module(emqx_session).
 
 -include("emqx.hrl").
+-include("emqx_session.hrl").
 -include("emqx_mqtt.hrl").
 -include("logger.hrl").
 -include("types.hrl").
@@ -101,49 +102,13 @@
 %% Export for CT
 -export([set_field/3]).
 
--type sessionID() :: emqx_guid:guid().
+-type session_id() :: emqx_guid:guid().
 
 -export_type([
     session/0,
-    sessionID/0
+    session_id/0
 ]).
 
--record(session, {
-    %% Client's id
-    clientid :: emqx_types:clientid(),
-    id :: sessionID(),
-    %% Is this session a persistent session i.e. was it started with Session-Expiry > 0
-    is_persistent :: boolean(),
-    %% Client’s Subscriptions.
-    subscriptions :: map(),
-    %% Max subscriptions allowed
-    max_subscriptions :: non_neg_integer() | infinity,
-    %% Upgrade QoS?
-    upgrade_qos :: boolean(),
-    %% Client <- Broker: QoS1/2 messages sent to the client but
-    %% have not been unacked.
-    inflight :: emqx_inflight:inflight(),
-    %% All QoS1/2 messages published to when client is disconnected,
-    %% or QoS1/2 messages pending transmission to the Client.
-    %%
-    %% Optionally, QoS0 messages pending transmission to the Client.
-    mqueue :: emqx_mqueue:mqueue(),
-    %% Next packet id of the session
-    next_pkt_id = 1 :: emqx_types:packet_id(),
-    %% Retry interval for redelivering QoS1/2 messages (Unit: millisecond)
-    retry_interval :: timeout(),
-    %% Client -> Broker: QoS2 messages received from the client, but
-    %% have not been completely acknowledged
-    awaiting_rel :: map(),
-    %% Maximum number of awaiting QoS2 messages allowed
-    max_awaiting_rel :: non_neg_integer() | infinity,
-    %% Awaiting PUBREL Timeout (Unit: millisecond)
-    await_rel_timeout :: timeout(),
-    %% Created at
-    created_at :: pos_integer()
-    %% Message deliver latency stats
-}).
-
 -type inflight_data_phase() :: wait_ack | wait_comp.
 
 -record(inflight_data, {
@@ -297,7 +262,9 @@ info(awaiting_rel_max, #session{max_awaiting_rel = Max}) ->
 info(await_rel_timeout, #session{await_rel_timeout = Timeout}) ->
     Timeout;
 info(created_at, #session{created_at = CreatedAt}) ->
-    CreatedAt.
+    CreatedAt;
+info(iterators, #session{iterators = IteratorIds}) ->
+    IteratorIds.
 
 %% @doc Get stats of the session.
 -spec stats(session()) -> emqx_types:stats().
@@ -324,11 +291,13 @@ subscribe(
     case IsNew andalso is_subscriptions_full(Session) of
         false ->
             ok = emqx_broker:subscribe(TopicFilter, ClientId, SubOpts),
+            Session1 = Session#session{subscriptions = maps:put(TopicFilter, SubOpts, Subs)},
+            Session2 = add_persistent_subscription(TopicFilter, ClientId, Session1),
             ok = emqx_hooks:run(
                 'session.subscribed',
                 [ClientInfo, TopicFilter, SubOpts#{is_new => IsNew}]
             ),
-            {ok, Session#session{subscriptions = maps:put(TopicFilter, SubOpts, Subs)}};
+            {ok, Session2};
         true ->
             {error, ?RC_QUOTA_EXCEEDED}
     end.
@@ -341,6 +310,19 @@ is_subscriptions_full(#session{
 }) ->
     maps:size(Subs) >= MaxLimit.
 
+-spec add_persistent_subscription(emqx_types:topic(), emqx_types:clientid(), session()) ->
+    session().
+add_persistent_subscription(TopicFilterBin, ClientId, Session) ->
+    case emqx_persistent_session_ds:add_subscription(TopicFilterBin, ClientId) of
+        {skipped, disabled} ->
+            Session;
+        {ok, IteratorID, _IsNew = true} ->
+            Iterators = Session#session.iterators,
+            Session#session{iterators = [IteratorID | Iterators]};
+        {ok, _IteratorID, _IsNew = false} ->
+            Session
+    end.
+
 %%--------------------------------------------------------------------
 %% Client -> Broker: UNSUBSCRIBE
 %%--------------------------------------------------------------------

+ 1 - 0
apps/emqx/src/emqx_session_router.erl

@@ -21,6 +21,7 @@
 -include("emqx.hrl").
 -include("logger.hrl").
 -include("types.hrl").
+-include("persistent_session/emqx_persistent_session.hrl").
 
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 

+ 8 - 8
apps/emqx/src/persistent_session/emqx_persistent_session.erl

@@ -115,10 +115,10 @@ storage_backend() ->
 %% Session message ADT API
 %%--------------------------------------------------------------------
 
--spec session_message_info('timestamp' | 'sessionID', sess_msg_key()) -> term().
+-spec session_message_info('timestamp' | 'session_id', sess_msg_key()) -> term().
 session_message_info(timestamp, {_, <<>>, <<TS:64>>, ?ABANDONED}) -> TS;
 session_message_info(timestamp, {_, GUID, _, _}) -> emqx_guid:timestamp(GUID);
-session_message_info(sessionID, {SessionID, _, _, _}) -> SessionID.
+session_message_info(session_id, {SessionID, _, _, _}) -> SessionID.
 
 %%--------------------------------------------------------------------
 %% DB API
@@ -243,7 +243,7 @@ discard_opt(true, ClientID, Session) ->
     emqx_session_router:delete_routes(SessionID, Subscriptions),
     emqx_session:set_field(is_persistent, false, Session).
 
--spec mark_resume_begin(emqx_session:sessionID()) -> emqx_guid:guid().
+-spec mark_resume_begin(emqx_session:session_id()) -> emqx_guid:guid().
 mark_resume_begin(SessionID) ->
     MarkerID = emqx_guid:gen(),
     put_session_message({SessionID, MarkerID, <<>>, ?MARKER}),
@@ -396,12 +396,12 @@ do_mark_as_delivered(SessionID, [{deliver, STopic, Msg} | Left]) ->
 do_mark_as_delivered(_SessionID, []) ->
     ok.
 
--spec pending(emqx_session:sessionID()) ->
+-spec pending(emqx_session:session_id()) ->
     [{emqx_types:message(), STopic :: binary()}].
 pending(SessionID) ->
     pending_messages_in_db(SessionID, []).
 
--spec pending(emqx_session:sessionID(), MarkerIDs :: [emqx_guid:guid()]) ->
+-spec pending(emqx_session:session_id(), MarkerIDs :: [emqx_guid:guid()]) ->
     [{emqx_types:message(), STopic :: binary()}].
 pending(SessionID, MarkerIds) ->
     %% TODO: Handle lost MarkerIDs
@@ -460,8 +460,8 @@ read_pending_msgs([], Acc) ->
     lists:reverse(Acc).
 
 %% The keys are ordered by
-%%     {sessionID(), <<>>, bin_timestamp(), ?ABANDONED} For abandoned sessions (clean started or expired).
-%%     {sessionID(), emqx_guid:guid(), STopic :: binary(), ?DELIVERED | ?UNDELIVERED | ?MARKER}
+%%     {session_id(), <<>>, bin_timestamp(), ?ABANDONED} For abandoned sessions (clean started or expired).
+%%     {session_id(), emqx_guid:guid(), STopic :: binary(), ?DELIVERED | ?UNDELIVERED | ?MARKER}
 %%  where
 %%     <<>> < emqx_guid:guid()
 %%     <<>> < bin_timestamp()
@@ -491,7 +491,7 @@ pending_messages({SessionID, PrevMsgId, PrevSTopic, PrevTag} = PrevKey, Acc, Mar
                 false -> pending_messages(Key, Acc, MarkerIds);
                 true -> pending_messages(Key, [{PrevMsgId, PrevSTopic} | Acc], MarkerIds)
             end;
-        %% Next sessionID or '$end_of_table'
+        %% Next session_id or '$end_of_table'
         _What ->
             case PrevTag =:= ?UNDELIVERED of
                 false -> {lists:reverse(Acc), MarkerIds};

+ 2 - 0
apps/emqx/src/persistent_session/emqx_persistent_session.hrl

@@ -14,6 +14,8 @@
 %% limitations under the License.
 %%--------------------------------------------------------------------
 
+-define(PERSISTENT_SESSION_SHARD, emqx_persistent_session_shard).
+
 -record(session_store, {
     client_id :: binary(),
     expiry_interval :: non_neg_integer(),

+ 1 - 0
apps/emqx/src/persistent_session/emqx_persistent_session_gc.erl

@@ -56,6 +56,7 @@ start_link() ->
 
 init([]) ->
     process_flag(trap_exit, true),
+    mria_rlog:ensure_shard(?PERSISTENT_SESSION_SHARD),
     {ok, start_message_gc_timer(start_session_gc_timer(#{}))}.
 
 handle_call(_Request, _From, State) ->

+ 49 - 0
apps/emqx/src/proto/emqx_ds_proto_v1.erl

@@ -0,0 +1,49 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_ds_proto_v1).
+
+-behaviour(emqx_bpapi).
+
+-export([
+    introduced_in/0,
+
+    open_iterator/4
+]).
+
+-include_lib("emqx/include/bpapi.hrl").
+
+-define(TIMEOUT, 30_000).
+
+introduced_in() ->
+    %% FIXME
+    "5.3.0".
+
+-spec open_iterator(
+    [node()],
+    emqx_topic:words(),
+    emqx_ds:time(),
+    emqx_ds:iterator_id()
+) ->
+    emqx_rpc:erpc_multicall(ok).
+open_iterator(Nodes, TopicFilter, StartMS, IteratorID) ->
+    erpc:multicall(
+        Nodes,
+        emqx_persistent_session_ds,
+        do_open_iterator,
+        [TopicFilter, StartMS, IteratorID],
+        ?TIMEOUT
+    ).

+ 4 - 1
apps/emqx/test/emqx_cth_cluster.erl

@@ -20,6 +20,7 @@
 -export([stop/1]).
 
 -export([share_load_module/2]).
+-export([node_name/1]).
 
 -define(APPS_CLUSTERING, [gen_rpc, mria, ekka]).
 
@@ -83,7 +84,7 @@ when
     }.
 start(Nodes, ClusterOpts) ->
     NodeSpecs = mk_nodespecs(Nodes, ClusterOpts),
-    ct:pal("Starting cluster: ~p", [NodeSpecs]),
+    ct:pal("Starting cluster:\n  ~p", [NodeSpecs]),
     % 1. Start bare nodes with only basic applications running
     _ = emqx_utils:pmap(fun start_node_init/1, NodeSpecs, ?TIMEOUT_NODE_START_MS),
     % 2. Start applications needed to enable clustering
@@ -237,6 +238,8 @@ default_appspec(emqx_conf, Spec, _NodeSpecs) ->
             listeners => allocate_listener_ports([tcp, ssl, ws, wss], Spec)
         }
     };
+default_appspec(emqx, Spec = #{listeners := true}, _NodeSpecs) ->
+    #{config => #{listeners => allocate_listener_ports([tcp, ssl, ws, wss], Spec)}};
 default_appspec(_App, _, _) ->
     #{}.
 

+ 180 - 14
apps/emqx/test/emqx_persistent_messages_SUITE.erl

@@ -17,6 +17,8 @@
 -module(emqx_persistent_messages_SUITE).
 
 -include_lib("stdlib/include/assert.hrl").
+-include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 -compile(export_all).
 -compile(nowarn_export_all).
@@ -24,25 +26,38 @@
 -define(NOW,
     (calendar:system_time_to_rfc3339(erlang:system_time(millisecond), [{unit, millisecond}]))
 ).
+-define(DS_SHARD, <<"local">>).
 
 all() ->
     emqx_common_test_helpers:all(?MODULE).
 
 init_per_suite(Config) ->
-    {ok, _} = application:ensure_all_started(emqx_durable_storage),
-    ok = emqx_common_test_helpers:start_apps([], fun
-        (emqx) ->
-            emqx_common_test_helpers:boot_modules(all),
-            emqx_config:init_load(emqx_schema, <<"persistent_session_store.ds = true">>),
-            emqx_app:set_config_loader(?MODULE);
-        (_) ->
-            ok
-    end),
+    %% avoid inter-suite flakiness...
+    application:stop(emqx),
+    application:stop(emqx_durable_storage),
+    TCApps = emqx_cth_suite:start(
+        app_specs(),
+        #{work_dir => ?config(priv_dir, Config)}
+    ),
+    [{tc_apps, TCApps} | Config].
+
+end_per_suite(Config) ->
+    TCApps = ?config(tc_apps, Config),
+    emqx_cth_suite:stop(TCApps),
+    ok.
+
+init_per_testcase(t_session_subscription_iterators, Config) ->
+    Cluster = cluster(),
+    Nodes = emqx_cth_cluster:start(Cluster, #{work_dir => ?config(priv_dir, Config)}),
+    [{nodes, Nodes} | Config];
+init_per_testcase(_TestCase, Config) ->
     Config.
 
-end_per_suite(_Config) ->
-    emqx_common_test_helpers:stop_apps([]),
-    application:stop(emqx_durable_storage),
+end_per_testcase(t_session_subscription_iterators, Config) ->
+    Nodes = ?config(nodes, Config),
+    ok = emqx_cth_cluster:stop(Nodes),
+    ok;
+end_per_testcase(_TestCase, _Config) ->
     ok.
 
 t_messages_persisted(_Config) ->
@@ -76,7 +91,7 @@ t_messages_persisted(_Config) ->
 
     ct:pal("Results = ~p", [Results]),
 
-    Persisted = consume(<<"local">>, {['#'], 0}),
+    Persisted = consume(?DS_SHARD, {['#'], 0}),
 
     ct:pal("Persisted = ~p", [Persisted]),
 
@@ -88,6 +103,98 @@ t_messages_persisted(_Config) ->
 
     ok.
 
+%% TODO: test quic and ws too
+t_session_subscription_iterators(Config) ->
+    [Node1, Node2] = ?config(nodes, Config),
+    Port = get_mqtt_port(Node1, tcp),
+    Topic = <<"t/topic">>,
+    SubTopicFilter = <<"t/+">>,
+    AnotherTopic = <<"u/another-topic">>,
+    ClientId = <<"myclientid">>,
+    ?check_trace(
+        begin
+            [
+                Payload1,
+                Payload2,
+                Payload3,
+                Payload4
+            ] = lists:map(
+                fun(N) -> <<"hello", (integer_to_binary(N))/binary>> end,
+                lists:seq(1, 4)
+            ),
+            ct:pal("starting"),
+            {ok, Client} = emqtt:start_link([
+                {port, Port},
+                {clientid, ClientId},
+                {proto_ver, v5}
+            ]),
+            {ok, _} = emqtt:connect(Client),
+            ct:pal("publishing 1"),
+            Message1 = emqx_message:make(Topic, Payload1),
+            publish(Node1, Message1),
+            receive_messages(1),
+            ct:pal("subscribing 1"),
+            {ok, _, [2]} = emqtt:subscribe(Client, SubTopicFilter, qos2),
+            ct:pal("publishing 2"),
+            Message2 = emqx_message:make(Topic, Payload2),
+            publish(Node1, Message2),
+            receive_messages(1),
+            ct:pal("subscribing 2"),
+            {ok, _, [1]} = emqtt:subscribe(Client, SubTopicFilter, qos1),
+            ct:pal("publishing 3"),
+            Message3 = emqx_message:make(Topic, Payload3),
+            publish(Node1, Message3),
+            receive_messages(1),
+            ct:pal("publishing 4"),
+            Message4 = emqx_message:make(AnotherTopic, Payload4),
+            publish(Node1, Message4),
+            IteratorIds = get_iterator_ids(Node1, ClientId),
+            emqtt:stop(Client),
+            #{
+                messages => [Message1, Message2, Message3, Message4],
+                iterator_ids => IteratorIds
+            }
+        end,
+        fun(Results, Trace) ->
+            ct:pal("trace:\n  ~p", [Trace]),
+            #{
+                messages := [_Message1, Message2, Message3 | _],
+                iterator_ids := IteratorIds
+            } = Results,
+            case ?of_kind(ds_session_subscription_added, Trace) of
+                [] ->
+                    %% Since `emqx_durable_storage' is a dependency of `emqx', it gets
+                    %% compiled in "prod" mode when running emqx standalone tests.
+                    ok;
+                [_ | _] ->
+                    ?assertMatch(
+                        [
+                            #{?snk_kind := ds_session_subscription_added},
+                            #{?snk_kind := ds_session_subscription_present}
+                        ],
+                        ?of_kind(
+                            [
+                                ds_session_subscription_added,
+                                ds_session_subscription_present
+                            ],
+                            Trace
+                        )
+                    ),
+                    ok
+            end,
+            ?assertMatch([_], IteratorIds),
+            [IteratorId] = IteratorIds,
+            ReplayMessages1 = erpc:call(Node1, fun() -> consume(?DS_SHARD, IteratorId) end),
+            ExpectedMessages = [Message2, Message3],
+            ?assertEqual(ExpectedMessages, ReplayMessages1),
+            %% Different DS shard
+            ReplayMessages2 = erpc:call(Node2, fun() -> consume(?DS_SHARD, IteratorId) end),
+            ?assertEqual([], ReplayMessages2),
+            ok
+        end
+    ),
+    ok.
+
 %%
 
 connect(ClientId, CleanStart, EI) ->
@@ -103,8 +210,11 @@ connect(ClientId, CleanStart, EI) ->
     {ok, _} = emqtt:connect(Client),
     Client.
 
-consume(Shard, Replay) ->
+consume(Shard, Replay = {_TopicFiler, _StartMS}) ->
     {ok, It} = emqx_ds_storage_layer:make_iterator(Shard, Replay),
+    consume(It);
+consume(Shard, IteratorId) when is_binary(IteratorId) ->
+    {ok, It} = emqx_ds_storage_layer:restore_iterator(Shard, IteratorId),
     consume(It).
 
 consume(It) ->
@@ -114,3 +224,59 @@ consume(It) ->
         none ->
             []
     end.
+
+receive_messages(Count) ->
+    receive_messages(Count, []).
+
+receive_messages(0, Msgs) ->
+    Msgs;
+receive_messages(Count, Msgs) ->
+    receive
+        {publish, Msg} ->
+            receive_messages(Count - 1, [Msg | Msgs]);
+        {deliver, _Topic, Msg} ->
+            receive_messages(Count - 1, [Msg | Msgs]);
+        _Other ->
+            receive_messages(Count, Msgs)
+    after 5000 ->
+        Msgs
+    end.
+
+publish(Node, Message) ->
+    erpc:call(Node, emqx, publish, [Message]).
+
+get_iterator_ids(Node, ClientId) ->
+    Channel = erpc:call(Node, fun() ->
+        [ConnPid] = emqx_cm:lookup_channels(ClientId),
+        sys:get_state(ConnPid)
+    end),
+    emqx_connection:info({channel, {session, iterators}}, Channel).
+
+app_specs() ->
+    [
+        emqx_durable_storage,
+        {emqx, #{
+            before_start => fun() ->
+                emqx_app:set_config_loader(?MODULE)
+            end,
+            config => #{persistent_session_store => #{ds => true}},
+            override_env => [{boot_modules, [broker, listeners]}]
+        }}
+    ].
+
+cluster() ->
+    Node1 = persistent_messages_SUITE1,
+    Spec = #{
+        role => core,
+        join_to => emqx_cth_cluster:node_name(Node1),
+        listeners => true,
+        apps => app_specs()
+    },
+    [
+        {Node1, Spec},
+        {persistent_messages_SUITE2, Spec}
+    ].
+
+get_mqtt_port(Node, Type) ->
+    {_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, Type, default, bind]]),
+    Port.

+ 22 - 0
apps/emqx/test/emqx_persistent_session_SUITE.erl

@@ -267,6 +267,8 @@ receive_messages(Count, Msgs) ->
     receive
         {publish, Msg} ->
             receive_messages(Count - 1, [Msg | Msgs]);
+        {deliver, _Topic, Msg} ->
+            receive_messages(Count - 1, [Msg | Msgs]);
         _Other ->
             receive_messages(Count, Msgs)
     after 5000 ->
@@ -373,6 +375,26 @@ do_publish(Payloads = [_ | _], PublishFun, WaitForUnregister) ->
 do_publish(Payload, PublishFun, WaitForUnregister) ->
     do_publish([Payload], PublishFun, WaitForUnregister).
 
+get_replay_messages(ReplayID) ->
+    DSShard = <<"local">>,
+    case emqx_ds_storage_layer:restore_iterator(DSShard, ReplayID) of
+        {ok, It} ->
+            do_get_replay_messages(It, []);
+        Error ->
+            error({"error restoring iterator", #{error => Error, replay_id => ReplayID}})
+    end.
+
+do_get_replay_messages(It, Acc) ->
+    case emqx_ds_storage_layer:next(It) of
+        {value, Val, NewIt} ->
+            Msg = erlang:binary_to_term(Val),
+            do_get_replay_messages(NewIt, [Msg | Acc]);
+        none ->
+            {ok, lists:reverse(Acc)};
+        {error, Reason} ->
+            {error, Reason}
+    end.
+
 %%--------------------------------------------------------------------
 %% Test Cases
 %%--------------------------------------------------------------------

+ 18 - 27
apps/emqx/test/emqx_proper_types.erl

@@ -20,6 +20,7 @@
 
 -include_lib("proper/include/proper.hrl").
 -include("emqx.hrl").
+-include("emqx_session.hrl").
 -include("emqx_access_control.hrl").
 
 %% High level Types
@@ -132,33 +133,23 @@ clientinfo() ->
 sessioninfo() ->
     ?LET(
         Session,
-        {session, clientid(),
-            % id
-            sessionid(),
-            % is_persistent
-            boolean(),
-            % subscriptions
-            subscriptions(),
-            % max_subscriptions
-            non_neg_integer(),
-            % upgrade_qos
-            boolean(),
-            % emqx_inflight:inflight()
-            inflight(),
-            % emqx_mqueue:mqueue()
-            mqueue(),
-            % next_pkt_id
-            packet_id(),
-            % retry_interval
-            safty_timeout(),
-            % awaiting_rel
-            awaiting_rel(),
-            % max_awaiting_rel
-            non_neg_integer(),
-            % await_rel_timeout
-            safty_timeout(),
-            % created_at
-            timestamp()},
+        #session{
+            clientid = clientid(),
+            id = sessionid(),
+            is_persistent = boolean(),
+            subscriptions = subscriptions(),
+            max_subscriptions = non_neg_integer(),
+            upgrade_qos = boolean(),
+            inflight = inflight(),
+            mqueue = mqueue(),
+            next_pkt_id = packet_id(),
+            retry_interval = safty_timeout(),
+            awaiting_rel = awaiting_rel(),
+            max_awaiting_rel = non_neg_integer(),
+            await_rel_timeout = safty_timeout(),
+            created_at = timestamp(),
+            iterators = []
+        },
         emqx_session:info(Session)
     ).
 

+ 53 - 9
apps/emqx_durable_storage/src/emqx_ds.erl

@@ -15,6 +15,8 @@
 %%--------------------------------------------------------------------
 -module(emqx_ds).
 
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
 %% API:
 -export([ensure_shard/2]).
 %%   Messages:
@@ -56,7 +58,7 @@
 
 -type iterator() :: term().
 
--opaque iterator_id() :: binary().
+-type iterator_id() :: binary().
 
 %%-type session() :: #session{}.
 
@@ -73,7 +75,8 @@
 
 %% Timestamp
 %% Earliest possible timestamp is 0.
-%% TODO granularity?
+%% TODO granularity?  Currently, we should always use micro second, as that's the unit we
+%% use in emqx_guid.  Otherwise, the iterators won't match the message timestamps.
 -type time() :: non_neg_integer().
 
 %%================================================================================
@@ -129,11 +132,13 @@ session_open(ClientID) ->
             fun() ->
                 case mnesia:read(?SESSION_TAB, ClientID) of
                     [#session{iterators = Iterators}] ->
-                        {false, ClientID, Iterators};
+                        IteratorIDs = maps:values(Iterators),
+                        {false, ClientID, IteratorIDs};
                     [] ->
-                        Session = #session{id = ClientID, iterators = []},
+                        Iterators = #{},
+                        Session = #session{id = ClientID, iterators = Iterators},
                         mnesia:write(?SESSION_TAB, Session, write),
-                        {true, ClientID, []}
+                        {true, ClientID, _IteratorIDs = []}
                 end
             end
         ),
@@ -160,10 +165,38 @@ session_suspend(_SessionId) ->
 
 %% @doc Called when a client subscribes to a topic. Idempotent.
 -spec session_add_iterator(session_id(), emqx_topic:words()) ->
-    {ok, iterator_id()} | {error, session_not_found}.
-session_add_iterator(_SessionId, _TopicFilter) ->
-    %% TODO
-    {ok, <<"">>}.
+    {ok, iterator_id(), time(), _IsNew :: boolean()} | {error, session_not_found}.
+session_add_iterator(DSSessionId, TopicFilter) ->
+    {atomic, Ret} =
+        mria:transaction(
+            ?DS_SHARD,
+            fun() ->
+                case mnesia:wread({?SESSION_TAB, DSSessionId}) of
+                    [] ->
+                        {error, session_not_found};
+                    [#session{iterators = #{TopicFilter := IteratorId}}] ->
+                        StartMS = get_start_ms(IteratorId, DSSessionId),
+                        ?tp(
+                            ds_session_subscription_present,
+                            #{iterator_id => IteratorId, session_id => DSSessionId}
+                        ),
+                        IsNew = false,
+                        {ok, IteratorId, StartMS, IsNew};
+                    [#session{iterators = Iterators0} = Session0] ->
+                        {IteratorId, StartMS} = new_iterator_id(DSSessionId),
+                        Iterators = Iterators0#{TopicFilter => IteratorId},
+                        Session = Session0#session{iterators = Iterators},
+                        mnesia:write(?SESSION_TAB, Session, write),
+                        ?tp(
+                            ds_session_subscription_added,
+                            #{iterator_id => IteratorId, session_id => DSSessionId}
+                        ),
+                        IsNew = true,
+                        {ok, IteratorId, StartMS, IsNew}
+                end
+            end
+        ),
+    Ret.
 
 %% @doc Called when a client unsubscribes from a topic. Returns `true'
 %% if the session contained the subscription or `false' if it wasn't
@@ -201,3 +234,14 @@ iterator_stats() ->
 %%================================================================================
 %% Internal functions
 %%================================================================================
+
+-spec new_iterator_id(session_id()) -> {iterator_id(), time()}.
+new_iterator_id(DSSessionId) ->
+    NowMS = erlang:system_time(microsecond),
+    NowMSBin = integer_to_binary(NowMS),
+    {<<DSSessionId/binary, NowMSBin/binary>>, NowMS}.
+
+-spec get_start_ms(iterator_id(), emqx_session:session_id()) -> time().
+get_start_ms(IteratorId, SessionId) ->
+    <<SessionId:(size(SessionId))/binary, StartMSBin/binary>> = IteratorId,
+    binary_to_integer(StartMSBin).

+ 1 - 1
apps/emqx_durable_storage/src/emqx_ds_int.hrl

@@ -21,7 +21,7 @@
 
 -record(session, {
     id :: emqx_ds:session_id(),
-    iterators :: [{emqx_topic:words(), emqx_ds:iterator_id()}]
+    iterators :: #{emqx_topic:words() => emqx_ds:iterator_id()}
 }).
 
 -endif.

+ 1 - 1
apps/emqx_durable_storage/src/emqx_ds_replay.erl

@@ -15,7 +15,7 @@
 -type replay_id() :: binary().
 
 -type replay() :: {
-    _TopicFilter :: emqx_ds:topic(),
+    _TopicFilter :: emqx_ds:words(),
     _StartTime :: emqx_ds:time()
 }.
 

+ 53 - 5
apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl

@@ -13,7 +13,13 @@
 
 -export([make_iterator/2, next/1]).
 
--export([preserve_iterator/2, restore_iterator/2, discard_iterator/2]).
+-export([
+    preserve_iterator/2,
+    restore_iterator/2,
+    discard_iterator/2,
+    is_iterator_present/2,
+    discard_iterator_prefix/2
+]).
 
 %% behaviour callbacks:
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
@@ -160,10 +166,10 @@ next(It = #it{module = Mod, data = ItData}) ->
             end
     end.
 
--spec preserve_iterator(iterator(), emqx_ds_replay:replay_id()) ->
+-spec preserve_iterator(iterator(), emqx_ds:iterator_id()) ->
     ok | {error, _TODO}.
-preserve_iterator(It = #it{}, ReplayID) ->
-    iterator_put_state(ReplayID, It).
+preserve_iterator(It = #it{}, IteratorID) ->
+    iterator_put_state(IteratorID, It).
 
 -spec restore_iterator(emqx_ds:shard(), emqx_ds_replay:replay_id()) ->
     {ok, iterator()} | {error, _TODO}.
@@ -177,11 +183,27 @@ restore_iterator(Shard, ReplayID) ->
             Error
     end.
 
--spec discard_iterator(emqx_ds:shard(), emqx_ds:replay_id()) ->
+-spec is_iterator_present(emqx_ds:shard(), emqx_ds_replay:replay_id()) ->
+    boolean().
+is_iterator_present(Shard, ReplayID) ->
+    %% TODO: use keyMayExist after added to wrapper?
+    case iterator_get_state(Shard, ReplayID) of
+        {ok, _} ->
+            true;
+        _ ->
+            false
+    end.
+
+-spec discard_iterator(emqx_ds:shard(), emqx_ds_replay:replay_id()) ->
     ok | {error, _TODO}.
 discard_iterator(Shard, ReplayID) ->
     iterator_delete(Shard, ReplayID).
 
+-spec discard_iterator_prefix(emqx_ds:shard(), binary()) ->
+    ok | {error, _TODO}.
+discard_iterator_prefix(Shard, KeyPrefix) ->
+    do_discard_iterator_prefix(Shard, KeyPrefix).
+
 %%================================================================================
 %% behaviour callbacks
 %%================================================================================
@@ -391,6 +413,32 @@ restore_iterator_state(
     It = #it{shard = Shard, gen = Gen, replay = {TopicFilter, StartTime}},
     open_restore_iterator(meta_get_gen(Shard, Gen), It, State).
 
+do_discard_iterator_prefix(Shard, KeyPrefix) ->
+    #db{handle = Handle, cf_iterator = CF} = meta_lookup(Shard, db),
+    case rocksdb:iterator(Handle, CF, ?ITERATION_READ_OPTS) of
+        {ok, It} ->
+            NextAction = {seek, KeyPrefix},
+            do_discard_iterator_prefix(Handle, CF, It, KeyPrefix, NextAction);
+        Error ->
+            Error
+    end.
+
+do_discard_iterator_prefix(DBHandle, CF, It, KeyPrefix, NextAction) ->
+    case rocksdb:iterator_move(It, NextAction) of
+        {ok, K = <<KeyPrefix:(size(KeyPrefix))/binary, _/binary>>, _V} ->
+            ok = rocksdb:delete(DBHandle, CF, K, ?ITERATION_WRITE_OPTS),
+            do_discard_iterator_prefix(DBHandle, CF, It, KeyPrefix, next);
+        {ok, _K, _V} ->
+            ok = rocksdb:iterator_close(It),
+            ok;
+        {error, invalid_iterator} ->
+            ok = rocksdb:iterator_close(It),
+            ok;
+        Error ->
+            ok = rocksdb:iterator_close(It),
+            Error
+    end.
+
 %% Functions for dealing with the metadata stored persistently in rocksdb
 
 -define(CURRENT_GEN, <<"current">>).

+ 1 - 1
apps/emqx_durable_storage/src/emqx_durable_storage.app.src

@@ -2,7 +2,7 @@
 {application, emqx_durable_storage, [
     {description, "Message persistence and subscription replays for EMQX"},
     % strict semver, bump manually!
-    {vsn, "0.1.2"},
+    {vsn, "0.1.3"},
     {modules, []},
     {registered, []},
     {applications, [kernel, stdlib, rocksdb, gproc, mria]},

+ 2 - 0
apps/emqx_durable_storage/test/props/emqx_ds_message_storage_bitmask_shim.erl

@@ -14,6 +14,8 @@
 
 -opaque t() :: ets:tid().
 
+-export_type([t/0]).
+
 -spec open() -> t().
 open() ->
     ets:new(?MODULE, [ordered_set, {keypos, 1}]).

+ 1 - 1
apps/emqx_management/src/emqx_mgmt_api_clients.erl

@@ -927,7 +927,7 @@ format_channel_info(WhichNode, {_, ClientInfo0, ClientStats}) ->
             retry_interval,
             upgrade_qos,
             zone,
-            %% sessionID, defined in emqx_session.erl
+            %% session_id, defined in emqx_session.erl
             id
         ],
     TimesKeys = [created_at, connected_at, disconnected_at],