Просмотр исходного кода

chore(ps_ds): make persistent session module use new `emqx_ds` APIs

Thales Macedo Garitezi 2 лет назад
Родитель
Сommit
903b3863d1

+ 44 - 81
apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl

@@ -14,7 +14,6 @@
 -define(DEFAULT_KEYSPACE, default).
 -define(DS_SHARD_ID, <<"local">>).
 -define(DS_SHARD, {?DEFAULT_KEYSPACE, ?DS_SHARD_ID}).
--define(ITERATOR_REF_TAB, emqx_ds_iterator_ref).
 
 -import(emqx_common_test_helpers, [on_exit/1]).
 
@@ -91,9 +90,6 @@ get_mqtt_port(Node, Type) ->
     {_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, Type, default, bind]]),
     Port.
 
-get_all_iterator_refs(Node) ->
-    erpc:call(Node, mnesia, dirty_all_keys, [?ITERATOR_REF_TAB]).
-
 get_all_iterator_ids(Node) ->
     Fn = fun(K, _V, Acc) -> [K | Acc] end,
     erpc:call(Node, fun() ->
@@ -126,6 +122,32 @@ start_client(Opts0 = #{}) ->
     on_exit(fun() -> catch emqtt:stop(Client) end),
     Client.
 
+restart_node(Node, NodeSpec) ->
+    ?tp(will_restart_node, #{}),
+    ?tp(notice, "restarting node", #{node => Node}),
+    true = monitor_node(Node, true),
+    ok = erpc:call(Node, init, restart, []),
+    receive
+        {nodedown, Node} ->
+            ok
+    after 10_000 ->
+        ct:fail("node ~p didn't stop", [Node])
+    end,
+    ?tp(notice, "waiting for nodeup", #{node => Node}),
+    wait_nodeup(Node),
+    wait_gen_rpc_down(NodeSpec),
+    ?tp(notice, "restarting apps", #{node => Node}),
+    Apps = maps:get(apps, NodeSpec),
+    ok = erpc:call(Node, emqx_cth_suite, load_apps, [Apps]),
+    _ = erpc:call(Node, emqx_cth_suite, start_apps, [Apps, NodeSpec]),
+    %% have to re-inject this so that we may stop the node succesfully at the
+    %% end....
+    ok = emqx_cth_cluster:set_node_opts(Node, NodeSpec),
+    ok = snabbkaffe:forward_trace(Node),
+    ?tp(notice, "node restarted", #{node => Node}),
+    ?tp(restarted_node, #{}),
+    ok.
+
 %%------------------------------------------------------------------------------
 %% Testcases
 %%------------------------------------------------------------------------------
@@ -143,24 +165,14 @@ t_non_persistent_session_subscription(_Config) ->
             {ok, _} = emqtt:connect(Client),
             ?tp(notice, "subscribing", #{}),
             {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(Client, SubTopicFilter, qos2),
-            IteratorRefs = get_all_iterator_refs(node()),
-            IteratorIds = get_all_iterator_ids(node()),
 
             ok = emqtt:stop(Client),
 
-            #{
-                iterator_refs => IteratorRefs,
-                iterator_ids => IteratorIds
-            }
+            ok
         end,
-        fun(Res, Trace) ->
+        fun(Trace) ->
             ct:pal("trace:\n  ~p", [Trace]),
-            #{
-                iterator_refs := IteratorRefs,
-                iterator_ids := IteratorIds
-            } = Res,
-            ?assertEqual([], IteratorRefs),
-            ?assertEqual({ok, []}, IteratorIds),
+            ?assertEqual([], ?of_kind(ds_session_subscription_added, Trace)),
             ok
         end
     ),
@@ -175,7 +187,7 @@ t_session_subscription_idempotency(Config) ->
     ?check_trace(
         begin
             ?force_ordering(
-                #{?snk_kind := persistent_session_ds_iterator_added},
+                #{?snk_kind := persistent_session_ds_subscription_added},
                 _NEvents0 = 1,
                 #{?snk_kind := will_restart_node},
                 _Guard0 = true
@@ -187,32 +199,7 @@ t_session_subscription_idempotency(Config) ->
                 _Guard1 = true
             ),
 
-            spawn_link(fun() ->
-                ?tp(will_restart_node, #{}),
-                ?tp(notice, "restarting node", #{node => Node1}),
-                true = monitor_node(Node1, true),
-                ok = erpc:call(Node1, init, restart, []),
-                receive
-                    {nodedown, Node1} ->
-                        ok
-                after 10_000 ->
-                    ct:fail("node ~p didn't stop", [Node1])
-                end,
-                ?tp(notice, "waiting for nodeup", #{node => Node1}),
-                wait_nodeup(Node1),
-                wait_gen_rpc_down(Node1Spec),
-                ?tp(notice, "restarting apps", #{node => Node1}),
-                Apps = maps:get(apps, Node1Spec),
-                ok = erpc:call(Node1, emqx_cth_suite, load_apps, [Apps]),
-                _ = erpc:call(Node1, emqx_cth_suite, start_apps, [Apps, Node1Spec]),
-                %% have to re-inject this so that we may stop the node succesfully at the
-                %% end....
-                ok = emqx_cth_cluster:set_node_opts(Node1, Node1Spec),
-                ok = snabbkaffe:forward_trace(Node1),
-                ?tp(notice, "node restarted", #{node => Node1}),
-                ?tp(restarted_node, #{}),
-                ok
-            end),
+            spawn_link(fun() -> restart_node(Node1, Node1Spec) end),
 
             ?tp(notice, "starting 1", #{}),
             Client0 = start_client(#{port => Port, clientid => ClientId}),
@@ -223,7 +210,7 @@ t_session_subscription_idempotency(Config) ->
             receive
                 {'EXIT', {shutdown, _}} ->
                     ok
-            after 0 -> ok
+            after 100 -> ok
             end,
             process_flag(trap_exit, false),
 
@@ -240,10 +227,7 @@ t_session_subscription_idempotency(Config) ->
         end,
         fun(Trace) ->
             ct:pal("trace:\n  ~p", [Trace]),
-            %% Exactly one iterator should have been opened.
             SubTopicFilterWords = emqx_topic:words(SubTopicFilter),
-            ?assertEqual([{ClientId, SubTopicFilterWords}], get_all_iterator_refs(Node1)),
-            ?assertMatch({ok, [_]}, get_all_iterator_ids(Node1)),
             ?assertMatch(
                 {ok, #{}, #{SubTopicFilterWords := #{}}},
                 erpc:call(Node1, emqx_persistent_session_ds, session_open, [ClientId])
@@ -262,7 +246,10 @@ t_session_unsubscription_idempotency(Config) ->
     ?check_trace(
         begin
             ?force_ordering(
-                #{?snk_kind := persistent_session_ds_close_iterators, ?snk_span := {complete, _}},
+                #{
+                    ?snk_kind := persistent_session_ds_subscription_delete,
+                    ?snk_span := {complete, _}
+                },
                 _NEvents0 = 1,
                 #{?snk_kind := will_restart_node},
                 _Guard0 = true
@@ -270,36 +257,11 @@ t_session_unsubscription_idempotency(Config) ->
             ?force_ordering(
                 #{?snk_kind := restarted_node},
                 _NEvents1 = 1,
-                #{?snk_kind := persistent_session_ds_iterator_delete, ?snk_span := start},
+                #{?snk_kind := persistent_session_ds_subscription_route_delete, ?snk_span := start},
                 _Guard1 = true
             ),
 
-            spawn_link(fun() ->
-                ?tp(will_restart_node, #{}),
-                ?tp(notice, "restarting node", #{node => Node1}),
-                true = monitor_node(Node1, true),
-                ok = erpc:call(Node1, init, restart, []),
-                receive
-                    {nodedown, Node1} ->
-                        ok
-                after 10_000 ->
-                    ct:fail("node ~p didn't stop", [Node1])
-                end,
-                ?tp(notice, "waiting for nodeup", #{node => Node1}),
-                wait_nodeup(Node1),
-                wait_gen_rpc_down(Node1Spec),
-                ?tp(notice, "restarting apps", #{node => Node1}),
-                Apps = maps:get(apps, Node1Spec),
-                ok = erpc:call(Node1, emqx_cth_suite, load_apps, [Apps]),
-                _ = erpc:call(Node1, emqx_cth_suite, start_apps, [Apps, Node1Spec]),
-                %% have to re-inject this so that we may stop the node succesfully at the
-                %% end....
-                ok = emqx_cth_cluster:set_node_opts(Node1, Node1Spec),
-                ok = snabbkaffe:forward_trace(Node1),
-                ?tp(notice, "node restarted", #{node => Node1}),
-                ?tp(restarted_node, #{}),
-                ok
-            end),
+            spawn_link(fun() -> restart_node(Node1, Node1Spec) end),
 
             ?tp(notice, "starting 1", #{}),
             Client0 = start_client(#{port => Port, clientid => ClientId}),
@@ -312,7 +274,7 @@ t_session_unsubscription_idempotency(Config) ->
             receive
                 {'EXIT', {shutdown, _}} ->
                     ok
-            after 0 -> ok
+            after 100 -> ok
             end,
             process_flag(trap_exit, false),
 
@@ -327,7 +289,7 @@ t_session_unsubscription_idempotency(Config) ->
                 ?wait_async_action(
                     emqtt:unsubscribe(Client1, SubTopicFilter),
                     #{
-                        ?snk_kind := persistent_session_ds_iterator_delete,
+                        ?snk_kind := persistent_session_ds_subscription_route_delete,
                         ?snk_span := {complete, _}
                     },
                     15_000
@@ -339,9 +301,10 @@ t_session_unsubscription_idempotency(Config) ->
         end,
         fun(Trace) ->
             ct:pal("trace:\n  ~p", [Trace]),
-            %% No iterators remaining
-            ?assertEqual([], get_all_iterator_refs(Node1)),
-            ?assertEqual({ok, []}, get_all_iterator_ids(Node1)),
+            ?assertMatch(
+                {ok, #{}, Subs = #{}} when map_size(Subs) =:= 0,
+                erpc:call(Node1, emqx_persistent_session_ds, session_open, [ClientId])
+            ),
             ok
         end
     ),

+ 1 - 1
apps/emqx/src/emqx_persistent_message.erl

@@ -42,7 +42,7 @@ init() ->
     ?WHEN_ENABLED(begin
         ok = emqx_ds:open_db(?PERSISTENT_MESSAGE_DB, #{}),
         ok = emqx_persistent_session_ds_router:init_tables(),
-        %ok = emqx_persistent_session_ds:create_tables(),
+        ok = emqx_persistent_session_ds:create_tables(),
         ok
     end).
 

+ 132 - 148
apps/emqx/src/emqx_persistent_session_ds.erl_

@@ -65,10 +65,13 @@
 %% Currently, this is the clientid.  We avoid `emqx_types:clientid()' because that can be
 %% an atom, in theory (?).
 -type id() :: binary().
--type iterator() :: emqx_ds:iterator().
--type iterator_id() :: emqx_ds:iterator_id().
 -type topic_filter() :: emqx_ds:topic_filter().
--type iterators() :: #{topic_filter() => iterator()}.
+-type subscription_id() :: {id(), topic_filter()}.
+-type subscription() :: #{
+    start_time := emqx_ds:time(),
+    propts := map(),
+    extra := map()
+}.
 -type session() :: #{
     %% Client ID
     id := id(),
@@ -77,7 +80,7 @@
     %% When the session should expire
     expires_at := timestamp() | never,
     %% Client’s Subscriptions.
-    iterators := #{topic() => iterator()},
+    iterators := #{topic() => subscription()},
     %%
     props := map()
 }.
@@ -90,6 +93,8 @@
 
 -export_type([id/0]).
 
+-define(PERSISTENT_MESSAGE_DB, emqx_persistent_message).
+
 %%
 
 -spec create(clientinfo(), conninfo(), emqx_session:conf()) ->
@@ -121,17 +126,17 @@ ensure_session(ClientID, Conf) ->
 
 open_session(ClientID) ->
     case session_open(ClientID) of
-        {ok, Session, Iterators} ->
-            Session#{iterators => prep_iterators(Iterators)};
+        {ok, Session, Subscriptions} ->
+            Session#{iterators => prep_subscriptions(Subscriptions)};
         false ->
             false
     end.
 
-prep_iterators(Iterators) ->
+prep_subscriptions(Subscriptions) ->
     maps:fold(
-        fun(Topic, Iterator, Acc) -> Acc#{emqx_topic:join(Topic) => Iterator} end,
+        fun(Topic, Subscription, Acc) -> Acc#{emqx_topic:join(Topic) => Subscription} end,
         #{},
-        Iterators
+        Subscriptions
     ).
 
 -spec destroy(session() | clientinfo()) -> ok.
@@ -228,7 +233,7 @@ unsubscribe(
 ) when is_map_key(TopicFilter, Iters) ->
     Iterator = maps:get(TopicFilter, Iters),
     SubOpts = maps:get(props, Iterator),
-    ok = del_subscription(TopicFilter, Iterator, ID),
+    ok = del_subscription(TopicFilter, ID),
     {ok, Session#{iterators := maps:remove(TopicFilter, Iters)}, SubOpts};
 unsubscribe(
     _TopicFilter,
@@ -327,91 +332,67 @@ terminate(_Reason, _Session = #{}) ->
 %%--------------------------------------------------------------------
 
 -spec add_subscription(topic(), emqx_types:subopts(), id()) ->
-    emqx_ds:iterator().
+    subscription().
 add_subscription(TopicFilterBin, SubOpts, DSSessionID) ->
-    % N.B.: we chose to update the router before adding the subscription to the
-    % session/iterator table.  The reasoning for this is as follows:
-    %
-    % Messages matching this topic filter should start to be persisted as soon as
-    % possible to avoid missing messages.  If this is the first such persistent
-    % session subscription, it's important to do so early on.
-    %
-    % This could, in turn, lead to some inconsistency: if such a route gets
-    % created but the session/iterator data fails to be updated accordingly, we
-    % have a dangling route.  To remove such dangling routes, we may have a
-    % periodic GC process that removes routes that do not have a matching
-    % persistent subscription.  Also, route operations use dirty mnesia
-    % operations, which inherently have room for inconsistencies.
-    %
-    % In practice, we use the iterator reference table as a source of truth,
-    % since it is guarded by a transaction context: we consider a subscription
-    % operation to be successful if it ended up changing this table.  Both router
-    % and iterator information can be reconstructed from this table, if needed.
+    %% N.B.: we chose to update the router before adding the subscription to the
+    %% session/iterator table.  The reasoning for this is as follows:
+    %%
+    %% Messages matching this topic filter should start to be persisted as soon as
+    %% possible to avoid missing messages.  If this is the first such persistent
+    %% session subscription, it's important to do so early on.
+    %%
+    %% This could, in turn, lead to some inconsistency: if such a route gets
+    %% created but the session/iterator data fails to be updated accordingly, we
+    %% have a dangling route.  To remove such dangling routes, we may have a
+    %% periodic GC process that removes routes that do not have a matching
+    %% persistent subscription.  Also, route operations use dirty mnesia
+    %% operations, which inherently have room for inconsistencies.
+    %%
+    %% In practice, we use the iterator reference table as a source of truth,
+    %% since it is guarded by a transaction context: we consider a subscription
+    %% operation to be successful if it ended up changing this table.  Both router
+    %% and iterator information can be reconstructed from this table, if needed.
     ok = emqx_persistent_session_ds_router:do_add_route(TopicFilterBin, DSSessionID),
     TopicFilter = emqx_topic:words(TopicFilterBin),
-    {ok, Iterator, IsNew} = session_add_iterator(
+    {ok, DSSubExt, IsNew} = session_add_subscription(
         DSSessionID, TopicFilter, SubOpts
     ),
-    Ctx = #{iterator => Iterator, is_new => IsNew},
-    ?tp(persistent_session_ds_iterator_added, Ctx),
-    ?tp_span(
-        persistent_session_ds_open_iterators,
-        Ctx,
-        ok = open_iterator_on_all_shards(TopicFilter, Iterator)
-    ),
-    Iterator.
+    ?tp(persistent_session_ds_subscription_added, #{sub => DSSubExt, is_new => IsNew}),
+    %% we'll list streams and open iterators when implementing message replay.
+    DSSubExt.
 
--spec update_subscription(topic(), iterator(), emqx_types:subopts(), id()) ->
-    iterator().
-update_subscription(TopicFilterBin, Iterator, SubOpts, DSSessionID) ->
+-spec update_subscription(topic(), subscription(), emqx_types:subopts(), id()) ->
+    subscription().
+update_subscription(TopicFilterBin, DSSubExt, SubOpts, DSSessionID) ->
     TopicFilter = emqx_topic:words(TopicFilterBin),
-    {ok, NIterator, false} = session_add_iterator(
+    {ok, NDSSubExt, false} = session_add_subscription(
         DSSessionID, TopicFilter, SubOpts
     ),
-    ok = ?tp(persistent_session_ds_iterator_updated, #{iterator => Iterator}),
-    NIterator.
-
--spec open_iterator_on_all_shards(emqx_types:words(), emqx_ds:iterator()) -> ok.
-open_iterator_on_all_shards(TopicFilter, Iterator) ->
-    ?tp(persistent_session_ds_will_open_iterators, #{iterator => Iterator}),
-    %% Note: currently, shards map 1:1 to nodes, but this will change in the future.
-    Nodes = emqx:running_nodes(),
-    Results = emqx_persistent_session_ds_proto_v1:open_iterator(
-        Nodes,
-        TopicFilter,
-        maps:get(start_time, Iterator),
-        maps:get(id, Iterator)
-    ),
-    %% TODO
-    %% 1. Handle errors.
-    %% 2. Iterator handles are rocksdb resources, it's doubtful they survive RPC.
-    %%    Even if they do, we throw them away here anyway. All in all, we probably should
-    %%    hold each of them in a process on the respective node.
-    true = lists:all(fun(Res) -> element(1, Res) =:= ok end, Results),
-    ok.
-
-%% RPC target.
--spec do_open_iterator(emqx_types:words(), emqx_ds:time(), emqx_ds:iterator_id()) ->
-    {ok, emqx_ds_storage_layer:iterator()} | {error, _Reason}.
-do_open_iterator(TopicFilter, StartMS, _IteratorID) ->
-    %% TODO: wrong
-    {ok, emqx_ds:make_iterator(TopicFilter, StartMS)}.
+    ok = ?tp(persistent_session_ds_iterator_updated, #{sub => DSSubExt}),
+    NDSSubExt.
 
--spec del_subscription(topic(), iterator(), id()) ->
+-spec del_subscription(topic(), id()) ->
     ok.
-del_subscription(TopicFilterBin, #{id := IteratorID}, DSSessionID) ->
-    % N.B.: see comments in `?MODULE:add_subscription' for a discussion about the
-    % order of operations here.
+del_subscription(TopicFilterBin, DSSessionId) ->
     TopicFilter = emqx_topic:words(TopicFilterBin),
-    ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilterBin, DSSessionID).
+    ?tp_span(
+        persistent_session_ds_subscription_delete,
+        #{session_id => DSSessionId},
+        ok = session_del_subscription(DSSessionId, TopicFilter)
+    ),
+    ?tp_span(
+        persistent_session_ds_subscription_route_delete,
+        #{session_id => DSSessionId},
+        ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilterBin, DSSessionId)
+    ).
 
 %%--------------------------------------------------------------------
 %% Session tables operations
 %%--------------------------------------------------------------------
 
 -define(SESSION_TAB, emqx_ds_session).
--define(ITERATOR_REF_TAB, emqx_ds_iterator_ref).
--define(DS_MRIA_SHARD, emqx_ds_shard).
+-define(SESSION_SUBSCRIPTIONS_TAB, emqx_ds_session_subscriptions).
+-define(DS_MRIA_SHARD, emqx_ds_session_shard).
 
 -record(session, {
     %% same as clientid
@@ -423,12 +404,13 @@ del_subscription(TopicFilterBin, #{id := IteratorID}, DSSessionID) ->
     props = #{} :: map()
 }).
 
--record(iterator_ref, {
-    ref_id :: {id(), emqx_ds:topic_filter()},
-    it_id :: emqx_ds:iterator_id(),
+-record(ds_sub, {
+    id :: subscription_id(),
     start_time :: emqx_ds:time(),
-    props = #{} :: map()
+    props = #{} :: map(),
+    extra = #{} :: map()
 }).
+-type ds_sub() :: #ds_sub{}.
 
 create_tables() ->
     ok = mria:create_table(
@@ -442,15 +424,16 @@ create_tables() ->
         ]
     ),
     ok = mria:create_table(
-        ?ITERATOR_REF_TAB,
+        ?SESSION_SUBSCRIPTIONS_TAB,
         [
             {rlog_shard, ?DS_MRIA_SHARD},
             {type, ordered_set},
             {storage, storage()},
-            {record_name, iterator_ref},
-            {attributes, record_info(fields, iterator_ref)}
+            {record_name, ds_sub},
+            {attributes, record_info(fields, ds_sub)}
         ]
     ),
+    ok = mria:wait_for_tables([?SESSION_TAB, ?SESSION_SUBSCRIPTIONS_TAB]),
     ok.
 
 -dialyzer({nowarn_function, storage/0}).
@@ -471,26 +454,26 @@ storage() ->
 %% Note: session API doesn't handle session takeovers, it's the job of
 %% the broker.
 -spec session_open(id()) ->
-    {ok, session(), iterators()} | false.
+    {ok, session(), #{topic() => subscription()}} | false.
 session_open(SessionId) ->
     transaction(fun() ->
         case mnesia:read(?SESSION_TAB, SessionId, write) of
             [Record = #session{}] ->
-                Session = export_record(Record),
-                IteratorRefs = session_read_iterators(SessionId),
-                Iterators = export_iterators(IteratorRefs),
-                {ok, Session, Iterators};
+                Session = export_session(Record),
+                DSSubs = session_read_subscriptions(SessionId),
+                Subscriptions = export_subscriptions(DSSubs),
+                {ok, Session, Subscriptions};
             [] ->
                 false
         end
     end).
 
 -spec session_ensure_new(id(), _Props :: map()) ->
-    {ok, session(), iterators()}.
+    {ok, session(), #{topic() => subscription()}}.
 session_ensure_new(SessionId, Props) ->
     transaction(fun() ->
-        ok = session_drop_iterators(SessionId),
-        Session = export_record(session_create(SessionId, Props)),
+        ok = session_drop_subscriptions(SessionId),
+        Session = export_session(session_create(SessionId, Props)),
         {ok, Session, #{}}
     end).
 
@@ -510,80 +493,80 @@ session_create(SessionId, Props) ->
 session_drop(DSSessionId) ->
     transaction(fun() ->
         %% TODO: ensure all iterators from this clientid are closed?
-        ok = session_drop_iterators(DSSessionId),
+        ok = session_drop_subscriptions(DSSessionId),
         ok = mnesia:delete(?SESSION_TAB, DSSessionId, write)
     end).
 
-session_drop_iterators(DSSessionId) ->
-    IteratorRefs = session_read_iterators(DSSessionId),
-    ok = lists:foreach(fun session_del_iterator/1, IteratorRefs).
+session_drop_subscriptions(DSSessionId) ->
+    IteratorRefs = session_read_subscriptions(DSSessionId),
+    ok = lists:foreach(fun session_del_subscription/1, IteratorRefs).
 
 %% @doc Called when a client subscribes to a topic. Idempotent.
--spec session_add_iterator(id(), topic_filter(), _Props :: map()) ->
-    {ok, iterator(), _IsNew :: boolean()}.
-session_add_iterator(DSSessionId, TopicFilter, Props) ->
-    IteratorRefId = {DSSessionId, TopicFilter},
+-spec session_add_subscription(id(), topic_filter(), _Props :: map()) ->
+    {ok, subscription(), _IsNew :: boolean()}.
+session_add_subscription(DSSessionId, TopicFilter, Props) ->
+    DSSubId = {DSSessionId, TopicFilter},
     transaction(fun() ->
-        case mnesia:read(?ITERATOR_REF_TAB, IteratorRefId, write) of
+        case mnesia:read(?SESSION_SUBSCRIPTIONS_TAB, DSSubId, write) of
             [] ->
-                IteratorRef = session_insert_iterator(DSSessionId, TopicFilter, Props),
-                Iterator = export_record(IteratorRef),
+                DSSub = session_insert_subscription(DSSessionId, TopicFilter, Props),
+                DSSubExt = export_subscription(DSSub),
                 ?tp(
                     ds_session_subscription_added,
-                    #{iterator => Iterator, session_id => DSSessionId}
+                    #{sub => DSSubExt, session_id => DSSessionId}
                 ),
-                {ok, Iterator, _IsNew = true};
-            [#iterator_ref{} = IteratorRef] ->
-                NIteratorRef = session_update_iterator(IteratorRef, Props),
-                NIterator = export_record(NIteratorRef),
+                {ok, DSSubExt, _IsNew = true};
+            [#ds_sub{} = DSSub] ->
+                NDSSub = session_update_subscription(DSSub, Props),
+                NDSSubExt = export_subscription(NDSSub),
                 ?tp(
                     ds_session_subscription_present,
-                    #{iterator => NIterator, session_id => DSSessionId}
+                    #{sub => NDSSubExt, session_id => DSSessionId}
                 ),
-                {ok, NIterator, _IsNew = false}
+                {ok, NDSSubExt, _IsNew = false}
         end
     end).
 
-session_insert_iterator(DSSessionId, TopicFilter, Props) ->
-    {IteratorId, StartMS} = new_iterator_id(DSSessionId),
-    IteratorRef = #iterator_ref{
-        ref_id = {DSSessionId, TopicFilter},
-        it_id = IteratorId,
+-spec session_insert_subscription(id(), topic_filter(), map()) -> ds_sub().
+session_insert_subscription(DSSessionId, TopicFilter, Props) ->
+    {DSSubId, StartMS} = new_subscription_id(DSSessionId, TopicFilter),
+    DSSub = #ds_sub{
+        id = DSSubId,
         start_time = StartMS,
-        props = Props
+        props = Props,
+        extra = #{}
     },
-    ok = mnesia:write(?ITERATOR_REF_TAB, IteratorRef, write),
-    IteratorRef.
-
-session_update_iterator(IteratorRef, Props) ->
-    NIteratorRef = IteratorRef#iterator_ref{props = Props},
-    ok = mnesia:write(?ITERATOR_REF_TAB, NIteratorRef, write),
-    NIteratorRef.
-
-%% @doc Called when a client unsubscribes from a topic.
--spec session_del_iterator(id(), topic_filter()) -> ok.
-session_del_iterator(DSSessionId, TopicFilter) ->
-    IteratorRefId = {DSSessionId, TopicFilter},
+    ok = mnesia:write(?SESSION_SUBSCRIPTIONS_TAB, DSSub, write),
+    DSSub.
+
+-spec session_update_subscription(ds_sub(), map()) -> ds_sub().
+session_update_subscription(DSSub, Props) ->
+    NDSSub = DSSub#ds_sub{props = Props},
+    ok = mnesia:write(?SESSION_SUBSCRIPTIONS_TAB, NDSSub, write),
+    NDSSub.
+
+session_del_subscription(DSSessionId, TopicFilter) ->
+    DSSubId = {DSSessionId, TopicFilter},
     transaction(fun() ->
-        mnesia:delete(?ITERATOR_REF_TAB, IteratorRefId, write)
+        mnesia:delete(?SESSION_SUBSCRIPTIONS_TAB, DSSubId, write)
     end).
 
-session_del_iterator(#iterator_ref{ref_id = IteratorRefId}) ->
-    mnesia:delete(?ITERATOR_REF_TAB, IteratorRefId, write).
+session_del_subscription(#ds_sub{id = DSSubId}) ->
+    mnesia:delete(?SESSION_SUBSCRIPTIONS_TAB, DSSubId, write).
 
-session_read_iterators(DSSessionId) ->
+session_read_subscriptions(DSSessionId) ->
     % NOTE: somewhat convoluted way to trick dialyzer
-    Pat = erlang:make_tuple(record_info(size, iterator_ref), '_', [
-        {1, iterator_ref},
-        {#iterator_ref.ref_id, {DSSessionId, '_'}}
+    Pat = erlang:make_tuple(record_info(size, ds_sub), '_', [
+        {1, ds_sub},
+        {#ds_sub.id, {DSSessionId, '_'}}
     ]),
-    mnesia:match_object(?ITERATOR_REF_TAB, Pat, read).
+    mnesia:match_object(?SESSION_SUBSCRIPTIONS_TAB, Pat, read).
 
--spec new_iterator_id(id()) -> {iterator_id(), emqx_ds:time()}.
-new_iterator_id(DSSessionId) ->
+-spec new_subscription_id(id(), topic_filter()) -> {subscription_id(), emqx_ds:time()}.
+new_subscription_id(DSSessionId, TopicFilter) ->
     NowMS = erlang:system_time(microsecond),
-    IteratorId = <<DSSessionId/binary, (emqx_guid:gen())/binary>>,
-    {IteratorId, NowMS}.
+    DSSubId = {DSSessionId, TopicFilter},
+    {DSSubId, NowMS}.
 
 %%--------------------------------------------------------------------------------
 
@@ -593,19 +576,20 @@ transaction(Fun) ->
 
 %%--------------------------------------------------------------------------------
 
-export_iterators(IteratorRefs) ->
+export_subscriptions(DSSubs) ->
     lists:foldl(
-        fun(IteratorRef = #iterator_ref{ref_id = {_DSSessionId, TopicFilter}}, Acc) ->
-            Acc#{TopicFilter => export_record(IteratorRef)}
+        fun(DSSub = #ds_sub{id = {_DSSessionId, TopicFilter}}, Acc) ->
+            Acc#{TopicFilter => export_subscription(DSSub)}
         end,
         #{},
-        IteratorRefs
+        DSSubs
     ).
 
-export_record(#session{} = Record) ->
-    export_record(Record, #session.id, [id, created_at, expires_at, props], #{});
-export_record(#iterator_ref{} = Record) ->
-    export_record(Record, #iterator_ref.it_id, [id, start_time, props], #{}).
+export_session(#session{} = Record) ->
+    export_record(Record, #session.id, [id, created_at, expires_at, props], #{}).
+
+export_subscription(#ds_sub{} = Record) ->
+    export_record(Record, #ds_sub.start_time, [start_time, props, extra], #{}).
 
 export_record(Record, I, [Field | Rest], Acc) ->
     export_record(Record, I + 1, Rest, Acc#{Field => element(I, Record)});

+ 18 - 48
apps/emqx/test/emqx_persistent_messages_SUITE.erl

@@ -29,6 +29,7 @@
 -define(DEFAULT_KEYSPACE, default).
 -define(DS_SHARD_ID, <<"local">>).
 -define(DS_SHARD, {?DEFAULT_KEYSPACE, ?DS_SHARD_ID}).
+-define(PERSISTENT_MESSAGE_DB, emqx_persistent_message).
 
 all() ->
     emqx_common_test_helpers:all(?MODULE).
@@ -62,6 +63,7 @@ end_per_testcase(t_session_subscription_iterators, Config) ->
 end_per_testcase(_TestCase, Config) ->
     Apps = ?config(apps, Config),
     emqx_common_test_helpers:call_janitor(60_000),
+    clear_db(),
     emqx_cth_suite:stop(Apps),
     ok.
 
@@ -96,7 +98,7 @@ t_messages_persisted(_Config) ->
 
     ct:pal("Results = ~p", [Results]),
 
-    Persisted = consume(?DS_SHARD, {['#'], 0}),
+    Persisted = consume(['#'], 0),
 
     ct:pal("Persisted = ~p", [Persisted]),
 
@@ -139,7 +141,7 @@ t_messages_persisted_2(_Config) ->
     {ok, #{reason_code := ?RC_NO_MATCHING_SUBSCRIBERS}} =
         emqtt:publish(CP, T(<<"client/2/topic">>), <<"8">>, 1),
 
-    Persisted = consume(?DS_SHARD, {['#'], 0}),
+    Persisted = consume(['#'], 0),
 
     ct:pal("Persisted = ~p", [Persisted]),
 
@@ -155,7 +157,7 @@ t_messages_persisted_2(_Config) ->
 
 %% TODO: test quic and ws too
 t_session_subscription_iterators(Config) ->
-    [Node1, Node2] = ?config(nodes, Config),
+    [Node1, _Node2] = ?config(nodes, Config),
     Port = get_mqtt_port(Node1, tcp),
     Topic = <<"t/topic">>,
     SubTopicFilter = <<"t/+">>,
@@ -202,11 +204,8 @@ t_session_subscription_iterators(Config) ->
                 messages => [Message1, Message2, Message3, Message4]
             }
         end,
-        fun(Results, Trace) ->
+        fun(Trace) ->
             ct:pal("trace:\n  ~p", [Trace]),
-            #{
-                messages := [_Message1, Message2, Message3 | _]
-            } = Results,
             case ?of_kind(ds_session_subscription_added, Trace) of
                 [] ->
                     %% Since `emqx_durable_storage' is a dependency of `emqx', it gets
@@ -228,17 +227,6 @@ t_session_subscription_iterators(Config) ->
                     ),
                     ok
             end,
-            ?assertMatch({ok, [_]}, get_all_iterator_ids(Node1)),
-            {ok, [IteratorId]} = get_all_iterator_ids(Node1),
-            ?assertMatch({ok, [IteratorId]}, get_all_iterator_ids(Node2)),
-            ReplayMessages1 = erpc:call(Node1, fun() -> consume(?DS_SHARD, IteratorId) end),
-            ExpectedMessages = [Message2, Message3],
-            %% Note: it is expected that this will break after replayers are in place.
-            %% They might have consumed all the messages by this time.
-            ?assertEqual(ExpectedMessages, ReplayMessages1),
-            %% Different DS shard
-            ReplayMessages2 = erpc:call(Node2, fun() -> consume(?DS_SHARD, IteratorId) end),
-            ?assertEqual([], ReplayMessages2),
             ok
         end
     ),
@@ -263,33 +251,21 @@ connect(Opts0 = #{}) ->
     {ok, _} = emqtt:connect(Client),
     Client.
 
-consume(Shard, Replay = {_TopicFiler, _StartMS}) ->
-    {ok, It} = emqx_ds_storage_layer:make_iterator(Shard, Replay),
-    consume(It);
-consume(Shard, IteratorId) when is_binary(IteratorId) ->
-    {ok, It} = emqx_ds_storage_layer:restore_iterator(Shard, IteratorId),
+consume(TopicFiler, StartMS) ->
+    [{_, Stream}] = emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFiler, StartMS),
+    {ok, It} = emqx_ds:make_iterator(Stream, StartMS),
     consume(It).
 
 consume(It) ->
-    case emqx_ds_storage_layer:next(It) of
-        {ok, NIt, [Msg]} ->
-            [emqx_persistent_message:deserialize(Msg) | consume(NIt)];
-        end_of_stream ->
+    case emqx_ds:next(It, 100) of
+        {ok, _NIt, _Msgs = []} ->
+            [];
+        {ok, NIt, Msgs} ->
+            Msgs ++ consume(NIt);
+        {ok, end_of_stream} ->
             []
     end.
 
-delete_all_messages() ->
-    Persisted = consume(?DS_SHARD, {['#'], 0}),
-    lists:foreach(
-        fun(Msg) ->
-            GUID = emqx_message:id(Msg),
-            Topic = emqx_topic:words(emqx_message:topic(Msg)),
-            Timestamp = emqx_guid:timestamp(GUID),
-            ok = emqx_ds_storage_layer:delete(?DS_SHARD, GUID, Timestamp, Topic)
-        end,
-        Persisted
-    ).
-
 receive_messages(Count) ->
     receive_messages(Count, []).
 
@@ -306,13 +282,6 @@ receive_messages(Count, Msgs) ->
 publish(Node, Message) ->
     erpc:call(Node, emqx, publish, [Message]).
 
-get_iterator_ids(Node, ClientId) ->
-    Channel = erpc:call(Node, fun() ->
-        [ConnPid] = emqx_cm:lookup_channels(ClientId),
-        sys:get_state(ConnPid)
-    end),
-    emqx_connection:info({channel, {session, iterators}}, Channel).
-
 app_specs() ->
     [
         emqx_durable_storage,
@@ -330,5 +299,6 @@ get_mqtt_port(Node, Type) ->
     {_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, Type, default, bind]]),
     Port.
 
-get_all_iterator_ids(Node) ->
-    erpc:call(Node, emqx_ds_storage_layer, list_iterator_prefix, [?DS_SHARD, <<>>]).
+clear_db() ->
+    ok = emqx_ds:drop_db(?PERSISTENT_MESSAGE_DB),
+    ok.

+ 5 - 5
apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl

@@ -48,19 +48,19 @@
 %% level.
 %%
 %% TODO: currently the stream is hardwired to only support the
-%% internal rocksdb storage. In t he future we want to add another
+%% internal rocksdb storage. In the future we want to add another
 %% implementations for emqx_ds, so this type has to take this into
 %% account.
 -record(stream, {
     shard :: emqx_ds_replication_layer:shard_id(),
-    enc :: emqx_ds_replication_layer:stream()
+    enc :: emqx_ds_storage_layer:stream()
 }).
 
--opaque stream() :: stream().
+-opaque stream() :: #stream{}.
 
 -record(iterator, {
     shard :: emqx_ds_replication_layer:shard_id(),
-    enc :: enqx_ds_replication_layer:iterator()
+    enc :: enqx_ds_storage_layer:iterator()
 }).
 
 -opaque iterator() :: #iterator{}.
@@ -154,7 +154,7 @@ next(Iter0, BatchSize) ->
     %% messages on the receiving node, hence saving some network.
     %%
     %% This kind of trickery should be probably done here in the
-    %% replication layer. Or, perhaps, in the logic lary.
+    %% replication layer. Or, perhaps, in the logic layer.
     case emqx_ds_proto_v1:next(Node, Shard, StorageIter0, BatchSize) of
         {ok, StorageIter, Batch} ->
             Iter = #iterator{shard = Shard, enc = StorageIter},