Procházet zdrojové kódy

fix(session): remove recently added `iterators` field from `#session{}` record

Fixes https://emqx.atlassian.net/browse/EMQX-10945

This could lead to `badrecord` errors being raised if a takeover were to happen during a
rolling cluster upgrade, as the old nodes could receive a record with more fields than
expected.
Thales Macedo Garitezi před 2 roky
rodič
revize
063913f245

+ 1 - 5
apps/emqx/include/emqx_session.hrl

@@ -49,11 +49,7 @@
     %% Awaiting PUBREL Timeout (Unit: millisecond)
     await_rel_timeout :: timeout(),
     %% Created at
-    created_at :: pos_integer(),
-    %% Topic filter to iterator ID mapping.
-    %% Note: we shouldn't serialize this when persisting sessions, as this information
-    %% also exists in the `?ITERATOR_REF_TAB' table.
-    iterators = #{} :: #{emqx_topic:topic() => emqx_ds:iterator_id()}
+    created_at :: pos_integer()
 }).
 
 -endif.

+ 4 - 17
apps/emqx/integration_test/emqx_ds_SUITE.erl

@@ -108,12 +108,6 @@ get_all_iterator_ids(Node) ->
         emqx_ds_storage_layer:foldl_iterator_prefix(?DS_SHARD, <<>>, Fn, [])
     end).
 
-get_session_iterators(Node, ClientId) ->
-    erpc:call(Node, fun() ->
-        [ConnPid] = emqx_cm:lookup_channels(ClientId),
-        emqx_connection:info({channel, {session, iterators}}, sys:get_state(ConnPid))
-    end).
-
 wait_nodeup(Node) ->
     ?retry(
         _Sleep0 = 500,
@@ -209,18 +203,14 @@ t_session_subscription_idempotency(Config) ->
             {ok, _} = emqtt:connect(Client1),
             ct:pal("subscribing 2"),
             {ok, _, [2]} = emqtt:subscribe(Client1, SubTopicFilter, qos2),
-            SessionIterators = get_session_iterators(Node1, ClientId),
 
             ok = emqtt:stop(Client1),
 
-            #{session_iterators => SessionIterators}
+            ok
         end,
-        fun(Res, Trace) ->
+        fun(Trace) ->
             ct:pal("trace:\n  ~p", [Trace]),
-            #{session_iterators := SessionIterators} = Res,
             %% Exactly one iterator should have been opened.
-            ?assertEqual(1, map_size(SessionIterators), #{iterators => SessionIterators}),
-            ?assertMatch(#{SubTopicFilter := _}, SessionIterators),
             SubTopicFilterWords = emqx_topic:words(SubTopicFilter),
             ?assertEqual([{ClientId, SubTopicFilterWords}], get_all_iterator_refs(Node1)),
             ?assertMatch({ok, [_]}, get_all_iterator_ids(Node1)),
@@ -321,17 +311,14 @@ t_session_unsubscription_idempotency(Config) ->
                     },
                     15_000
                 ),
-            SessionIterators = get_session_iterators(Node1, ClientId),
 
             ok = emqtt:stop(Client1),
 
-            #{session_iterators => SessionIterators}
+            ok
         end,
-        fun(Res, Trace) ->
+        fun(Trace) ->
             ct:pal("trace:\n  ~p", [Trace]),
-            #{session_iterators := SessionIterators} = Res,
             %% No iterators remaining
-            ?assertEqual(#{}, SessionIterators),
             ?assertEqual([], get_all_iterator_refs(Node1)),
             ?assertEqual({ok, []}, get_all_iterator_ids(Node1)),
             ok

+ 15 - 10
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -24,7 +24,7 @@
     persist_message/1,
     open_session/1,
     add_subscription/2,
-    del_subscription/3
+    del_subscription/2
 ]).
 
 -export([
@@ -139,21 +139,26 @@ do_open_iterator(TopicFilter, StartMS, IteratorID) ->
     {ok, _It} = emqx_ds_storage_layer:ensure_iterator(?DS_SHARD, IteratorID, Replay),
     ok.
 
--spec del_subscription(emqx_ds:iterator_id() | undefined, emqx_types:topic(), emqx_ds:session_id()) ->
+-spec del_subscription(emqx_types:topic(), emqx_ds:session_id()) ->
     ok | {skipped, disabled}.
-del_subscription(IteratorID, TopicFilterBin, DSSessionID) ->
+del_subscription(TopicFilterBin, DSSessionID) ->
     ?WHEN_ENABLED(
         begin
             TopicFilter = emqx_topic:words(TopicFilterBin),
-            Ctx = #{iterator_id => IteratorID},
-            ?tp_span(
-                persistent_session_ds_close_iterators,
-                Ctx,
-                ok = ensure_iterator_closed_on_all_shards(IteratorID)
-            ),
+            case emqx_ds:session_get_iterator_id(DSSessionID, TopicFilter) of
+                {error, not_found} ->
+                    %% already gone
+                    ok;
+                {ok, IteratorID} ->
+                    ?tp_span(
+                        persistent_session_ds_close_iterators,
+                        #{iterator_id => IteratorID},
+                        ok = ensure_iterator_closed_on_all_shards(IteratorID)
+                    )
+            end,
             ?tp_span(
                 persistent_session_ds_iterator_delete,
-                Ctx,
+                #{},
                 emqx_ds:session_del_iterator(DSSessionID, TopicFilter)
             )
         end

+ 5 - 19
apps/emqx/src/emqx_session.erl

@@ -269,9 +269,7 @@ info(awaiting_rel_max, #session{max_awaiting_rel = Max}) ->
 info(await_rel_timeout, #session{await_rel_timeout = Timeout}) ->
     Timeout;
 info(created_at, #session{created_at = CreatedAt}) ->
-    CreatedAt;
-info(iterators, #session{iterators = Iterators}) ->
-    Iterators.
+    CreatedAt.
 
 %% @doc Get stats of the session.
 -spec stats(session()) -> emqx_types:stats().
@@ -320,13 +318,8 @@ is_subscriptions_full(#session{
 -spec add_persistent_subscription(emqx_types:topic(), emqx_types:clientid(), session()) ->
     session().
 add_persistent_subscription(TopicFilterBin, ClientId, Session) ->
-    case emqx_persistent_session_ds:add_subscription(TopicFilterBin, ClientId) of
-        {ok, IteratorId, _IsNew} ->
-            Iterators = Session#session.iterators,
-            Session#session{iterators = Iterators#{TopicFilterBin => IteratorId}};
-        _ ->
-            Session
-    end.
+    _ = emqx_persistent_session_ds:add_subscription(TopicFilterBin, ClientId),
+    Session.
 
 %%--------------------------------------------------------------------
 %% Client -> Broker: UNSUBSCRIBE
@@ -356,15 +349,8 @@ unsubscribe(
 -spec remove_persistent_subscription(session(), emqx_types:topic(), emqx_types:clientid()) ->
     session().
 remove_persistent_subscription(Session, TopicFilterBin, ClientId) ->
-    Iterators = Session#session.iterators,
-    case maps:get(TopicFilterBin, Iterators, undefined) of
-        undefined ->
-            ok;
-        IteratorId ->
-            _ = emqx_persistent_session_ds:del_subscription(IteratorId, TopicFilterBin, ClientId),
-            ok
-    end,
-    Session#session{iterators = maps:remove(TopicFilterBin, Iterators)}.
+    _ = emqx_persistent_session_ds:del_subscription(TopicFilterBin, ClientId),
+    Session.
 
 %%--------------------------------------------------------------------
 %% Client -> Broker: PUBLISH