Просмотр исходного кода

feat(session): store iterator ids in session record

Thales Macedo Garitezi 2 лет назад
Родитель
Сommit
9c6dd30f44

+ 5 - 1
apps/emqx/include/emqx_session.hrl

@@ -49,7 +49,11 @@
     %% Awaiting PUBREL Timeout (Unit: millisecond)
     await_rel_timeout :: timeout(),
     %% Created at
-    created_at :: pos_integer()
+    created_at :: pos_integer(),
+    %% Topic filter to iterator ID mapping.
+    %% Note: we shouldn't serialize this when persisting sessions, as this information
+    %% also exists in the `?ITERATOR_REF_TAB' table.
+    iterators = #{} :: #{emqx_topic:topic() => emqx_ds:iterator_id()}
 }).
 
 -endif.

+ 12 - 2
apps/emqx/integration_test/emqx_ds_SUITE.erl

@@ -97,6 +97,12 @@ get_all_iterator_ids(Node) ->
         emqx_ds_storage_layer:foldl_iterator_prefix(?DS_SHARD, <<>>, Fn, [])
     end).
 
+get_session_iterators(Node, ClientId) ->
+    erpc:call(Node, fun() ->
+        [ConnPid] = emqx_cm:lookup_channels(ClientId),
+        emqx_connection:info({channel, {session, iterators}}, sys:get_state(ConnPid))
+    end).
+
 wait_nodeup(Node) ->
     ?retry(
         _Sleep0 = 500,
@@ -191,14 +197,18 @@ t_session_subscription_idempotency(Config) ->
             {ok, _} = emqtt:connect(Client1),
             ct:pal("subscribing 2"),
             {ok, _, [2]} = emqtt:subscribe(Client1, SubTopicFilter, qos2),
+            SessionIterators = get_session_iterators(Node1, ClientId),
 
             ok = emqtt:stop(Client1),
 
-            ok
+            #{session_iterators => SessionIterators}
         end,
-        fun(Trace) ->
+        fun(Res, Trace) ->
             ct:pal("trace:\n  ~p", [Trace]),
+            #{session_iterators := SessionIterators} = Res,
             %% Exactly one iterator should have been opened.
+            ?assertEqual(1, map_size(SessionIterators), #{iterators => SessionIterators}),
+            ?assertMatch(#{SubTopicFilter := _}, SessionIterators),
             ?assertMatch({ok, [_]}, get_all_iterator_ids(Node1)),
             ?assertMatch(
                 {_IsNew = false, ClientId},

+ 10 - 3
apps/emqx/src/emqx_session.erl

@@ -269,7 +269,9 @@ info(awaiting_rel_max, #session{max_awaiting_rel = Max}) ->
 info(await_rel_timeout, #session{await_rel_timeout = Timeout}) ->
     Timeout;
 info(created_at, #session{created_at = CreatedAt}) ->
-    CreatedAt.
+    CreatedAt;
+info(iterators, #session{iterators = Iterators}) ->
+    Iterators.
 
 %% @doc Get stats of the session.
 -spec stats(session()) -> emqx_types:stats().
@@ -318,8 +320,13 @@ is_subscriptions_full(#session{
 -spec add_persistent_subscription(emqx_types:topic(), emqx_types:clientid(), session()) ->
     session().
 add_persistent_subscription(TopicFilterBin, ClientId, Session) ->
-    _ = emqx_persistent_session_ds:add_subscription(TopicFilterBin, ClientId),
-    Session.
+    case emqx_persistent_session_ds:add_subscription(TopicFilterBin, ClientId) of
+        {ok, IteratorId, _IsNew} ->
+            Iterators = Session#session.iterators,
+            Session#session{iterators = Iterators#{TopicFilterBin => IteratorId}};
+        _ ->
+            Session
+    end.
 
 %%--------------------------------------------------------------------
 %% Client -> Broker: UNSUBSCRIBE