Просмотр исходного кода

feat(queue): simplify progress report on disconnect

Ilya Averyanov 1 год назад
Родитель
Сommit
9b30320ddb

+ 12 - 1
apps/emqx/include/emqx.hrl

@@ -65,9 +65,20 @@
 %% Route
 %%--------------------------------------------------------------------
 
+-record(share_dest, {
+    session_id :: emqx_session:session_id(),
+    group :: emqx_types:group()
+}).
+
 -record(route, {
     topic :: binary(),
-    dest :: node() | {binary(), node()} | emqx_session:session_id() | emqx_external_broker:dest()
+    dest ::
+        node()
+        | {binary(), node()}
+        | emqx_session:session_id()
+        %% One session can also have multiple subscriptions to the same topic through different groups
+        | #share_dest{}
+        | emqx_external_broker:dest()
 }).
 
 %%--------------------------------------------------------------------

+ 5 - 3
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -821,10 +821,12 @@ list_client_subscriptions(ClientId) ->
             {error, not_found}
     end.
 
--spec get_client_subscription(emqx_types:clientid(), emqx_types:topic()) ->
+-spec get_client_subscription(emqx_types:clientid(), topic_filter()) ->
     subscription() | undefined.
-get_client_subscription(ClientId, Topic) ->
-    emqx_persistent_session_ds_subs:cold_get_subscription(ClientId, Topic).
+get_client_subscription(ClientId, #share{} = ShareTopicFilter) ->
+    emqx_persistent_session_ds_subs:cold_get_subscription(ClientId, ShareTopicFilter);
+get_client_subscription(ClientId, TopicFilter) ->
+    emqx_persistent_session_ds_subs:cold_get_subscription(ClientId, TopicFilter).
 
 %%--------------------------------------------------------------------
 %% Session tables operations

+ 44 - 36
apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl

@@ -29,10 +29,10 @@
 -module(emqx_persistent_session_ds_shared_subs).
 
 -include("emqx_mqtt.hrl").
+-include("emqx.hrl").
 -include("logger.hrl").
 -include("session_internals.hrl").
 
--include_lib("emqx/include/emqx_persistent_message.hrl").
 -include_lib("snabbkaffe/include/trace.hrl").
 
 -export([
@@ -51,7 +51,10 @@
     to_map/2
 ]).
 
--define(EPOCH_BITS, 15).
+%% Management API:
+-export([
+    cold_get_subscription/2
+]).
 
 -define(schedule_subscribe, schedule_subscribe).
 -define(schedule_unsubscribe, schedule_unsubscribe).
@@ -160,7 +163,7 @@ on_subscribe(Subscription, ShareTopicFilter, SubOpts, Session) ->
     update_subscription(Subscription, ShareTopicFilter, SubOpts, Session).
 
 -dialyzer({nowarn_function, create_new_subscription/3}).
-create_new_subscription(#share{topic = TopicFilter} = ShareTopicFilter, SubOpts, #{
+create_new_subscription(#share{topic = TopicFilter, group = Group} = ShareTopicFilter, SubOpts, #{
     id := SessionId,
     s := S0,
     shared_sub_s := #{agent := Agent} = SharedSubS0,
@@ -172,9 +175,9 @@ create_new_subscription(#share{topic = TopicFilter} = ShareTopicFilter, SubOpts,
         )
     of
         ok ->
-            ok = emqx_persistent_session_ds_router:do_add_route(TopicFilter, SessionId),
-            _ = emqx_external_broker:add_persistent_route(TopicFilter, SessionId),
-
+            ok = emqx_persistent_session_ds_router:do_add_route(TopicFilter, #share_dest{
+                session_id = SessionId, group = Group
+            }),
             #{upgrade_qos := UpgradeQoS} = Props,
             {SubId, S1} = emqx_persistent_session_ds_state:new_id(S0),
             {SStateId, S2} = emqx_persistent_session_ds_state:new_id(S1),
@@ -259,7 +262,9 @@ schedule_subscribe(
 ) ->
     {ok, emqx_persistent_session_ds_state:t(), t(), emqx_persistent_session_ds:subscription()}
     | {error, emqx_types:reason_code()}.
-on_unsubscribe(SessionId, #share{topic = TopicFilter} = ShareTopicFilter, S0, SharedSubS0) ->
+on_unsubscribe(
+    SessionId, #share{topic = TopicFilter, group = Group} = ShareTopicFilter, S0, SharedSubS0
+) ->
     case lookup(ShareTopicFilter, S0) of
         undefined ->
             {error, ?RC_NO_SUBSCRIPTION_EXISTED};
@@ -267,8 +272,9 @@ on_unsubscribe(SessionId, #share{topic = TopicFilter} = ShareTopicFilter, S0, Sh
             ?tp(persistent_session_ds_subscription_delete, #{
                 session_id => SessionId, share_topic_filter => ShareTopicFilter
             }),
-            ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilter, SessionId),
-            _ = emqx_external_broker:delete_persistent_route(TopicFilter, SessionId),
+            ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilter, #share_dest{
+                session_id = SessionId, group = Group
+            }),
             S = emqx_persistent_session_ds_state:del_subscription(ShareTopicFilter, S0),
             SharedSubS = schedule_unsubscribe(S, SharedSubS0, SubId, ShareTopicFilter),
             {ok, S, SharedSubS, Subscription}
@@ -588,9 +594,32 @@ on_info(S, #{agent := Agent0} = SharedSubS0, Info) ->
 %% to_map
 
 -spec to_map(emqx_persistent_session_ds_state:t(), t()) -> map().
-to_map(_S, _SharedSubS) ->
-    %% TODO
-    #{}.
+to_map(S, _SharedSubS) ->
+    fold_shared_subs(
+        fun(ShareTopicFilter, _, Acc) -> Acc#{ShareTopicFilter => lookup(ShareTopicFilter, S)} end,
+        #{},
+        S
+    ).
+
+%%--------------------------------------------------------------------
+%% cold_get_subscription
+
+-spec cold_get_subscription(emqx_persistent_session_ds:id(), emqx_types:topic()) ->
+    emqx_persistent_session_ds:subscription() | undefined.
+cold_get_subscription(SessionId, ShareTopicFilter) ->
+    case emqx_persistent_session_ds_state:cold_get_subscription(SessionId, ShareTopicFilter) of
+        [Sub = #{current_state := SStateId}] ->
+            case
+                emqx_persistent_session_ds_state:cold_get_subscription_state(SessionId, SStateId)
+            of
+                [#{subopts := Subopts}] ->
+                    Sub#{subopts => Subopts};
+                _ ->
+                    undefined
+            end;
+        _ ->
+            undefined
+    end.
 
 %%--------------------------------------------------------------------
 %% Generic helpers
@@ -629,21 +658,13 @@ stream_progress(
     Stream,
     #srs{
         it_end = EndIt,
-        it_begin = BeginIt,
-        first_seqno_qos1 = StartQos1,
-        first_seqno_qos2 = StartQos2
+        it_begin = BeginIt
     } = SRS
 ) ->
-    Qos1Acked = n_acked(?QOS_1, CommQos1, StartQos1),
-    Qos2Acked = n_acked(?QOS_2, CommQos2, StartQos2),
     Iterator =
         case is_stream_fully_acked(CommQos1, CommQos2, SRS) of
-            true ->
-                EndIt;
-            false ->
-                emqx_ds_skipping_iterator:update_or_new(
-                    BeginIt, Qos1Acked, Qos2Acked
-                )
+            true -> EndIt;
+            false -> BeginIt
         end,
     #{
         stream => Stream,
@@ -714,19 +735,6 @@ is_stream_fully_acked(_, _, #srs{
 is_stream_fully_acked(Comm1, Comm2, #srs{last_seqno_qos1 = S1, last_seqno_qos2 = S2}) ->
     (Comm1 >= S1) andalso (Comm2 >= S2).
 
-n_acked(Qos, A, B) ->
-    max(seqno_diff(Qos, A, B), 0).
-
--dialyzer({nowarn_function, seqno_diff/3}).
-seqno_diff(?QOS_1, A, B) ->
-    %% For QoS1 messages we skip a seqno every time the epoch changes,
-    %% we need to substract that from the diff:
-    EpochA = A bsr ?EPOCH_BITS,
-    EpochB = B bsr ?EPOCH_BITS,
-    A - B - (EpochA - EpochB);
-seqno_diff(?QOS_2, A, B) ->
-    A - B.
-
 %%--------------------------------------------------------------------
 %% Formatters
 %%--------------------------------------------------------------------

+ 11 - 6
apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl

@@ -365,10 +365,13 @@ t_disconnect_no_double_replay1(_Config) ->
         end
     end,
 
-    {Missing, Duplicate} = verify_received_pubs(Pubs, 2 * NPubs, ClientByBid),
+    {Missing, _Duplicate} = verify_received_pubs(Pubs, 2 * NPubs, ClientByBid),
 
     ?assertEqual([], Missing),
-    ?assertEqual([], Duplicate),
+
+    %% We cannnot garantee that the message are not duplicated until we are able
+    %% to send progress of a partially replayed stream range to the leader.
+    % ?assertEqual([], Duplicate),
 
     ok = emqtt:disconnect(ConnShared1),
     ok = emqtt:disconnect(ConnPub).
@@ -395,10 +398,12 @@ t_disconnect_no_double_replay2(_Config) ->
     ConnShared12 = emqtt_connect_sub(<<"client_shared12">>),
     {ok, _, _} = emqtt:subscribe(ConnShared12, <<"$share/gr12/topic12/#">>, 1),
 
-    ?assertNotReceive(
-        {publish, #{payload := <<"1">>}},
-        3000
-    ),
+    %% We cannnot garantee that the message is not duplicated until we are able
+    %% to send progress of a partially replayed stream range to the leader.
+    % ?assertNotReceive(
+    %     {publish, #{payload := <<"1">>}},
+    %     3000
+    % ),
 
     ok = emqtt:disconnect(ConnShared12).
 

+ 1 - 5
apps/emqx_durable_storage/src/emqx_ds.erl

@@ -132,7 +132,7 @@
 %% TODO: Not implemented
 -type iterator_id() :: term().
 
--type iterator() :: ds_specific_iterator().
+-opaque iterator() :: ds_specific_iterator().
 
 -opaque delete_iterator() :: ds_specific_delete_iterator().
 
@@ -401,14 +401,10 @@ make_iterator(DB, Stream, TopicFilter, StartTime) ->
 
 -spec update_iterator(db(), iterator(), message_key()) ->
     make_iterator_result().
-update_iterator(DB, ?skipping_iterator_match = OldIter, DSKey) ->
-    emqx_ds_skipping_iterator:update_iterator(DB, OldIter, DSKey);
 update_iterator(DB, OldIter, DSKey) ->
     ?module(DB):update_iterator(DB, OldIter, DSKey).
 
 -spec next(db(), iterator(), pos_integer()) -> next_result().
-next(DB, ?skipping_iterator_match = Iter, BatchSize) ->
-    emqx_ds_skipping_iterator:next(DB, Iter, BatchSize);
 next(DB, Iter, BatchSize) ->
     ?module(DB):next(DB, Iter, BatchSize).
 

+ 0 - 86
apps/emqx_durable_storage/src/emqx_ds_skipping_iterator.erl

@@ -1,86 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
--module(emqx_ds_skipping_iterator).
-
--include("emqx_ds_skipping_iterator.hrl").
-
--type t() :: ?skipping_iterator(emqx_ds:iterator(), non_neg_integer(), non_neg_integer()).
-
--export([
-    update_or_new/3,
-    update_iterator/3,
-    next/3
-]).
-
-%%--------------------------------------------------------------------
-%% API
-%%--------------------------------------------------------------------
-
--spec update_or_new(t() | emqx_ds:iterator(), non_neg_integer(), non_neg_integer()) -> t().
-update_or_new(?skipping_iterator_match(Iterator, Q1Skip0, Q2Skip0), Q1Skip, Q2Skip) when
-    Q1Skip >= 0 andalso Q2Skip >= 0
-->
-    ?skipping_iterator(Iterator, Q1Skip0 + Q1Skip, Q2Skip0 + Q2Skip);
-update_or_new(Iterator, Q1Skip, Q2Skip) when Q1Skip >= 0 andalso Q2Skip >= 0 ->
-    ?skipping_iterator(Iterator, Q1Skip, Q2Skip).
-
--spec next(emqx_ds:db(), t(), pos_integer()) -> emqx_ds:next_result(t()).
-next(DB, ?skipping_iterator_match(Iterator0, Q1Skip0, Q2Skip0), Count) ->
-    case emqx_ds:next(DB, Iterator0, Count) of
-        {error, _, _} = Error ->
-            Error;
-        {ok, end_of_stream} ->
-            {ok, end_of_stream};
-        {ok, Iterator1, Messages0} ->
-            {Messages1, Q1Skip1, Q2Skip1} = skip(Messages0, Q1Skip0, Q2Skip0),
-            case {Q1Skip1, Q2Skip1} of
-                {0, 0} -> {ok, Iterator1, Messages1};
-                _ -> {ok, ?skipping_iterator(Iterator1, Q1Skip1, Q2Skip1)}
-            end
-    end.
-
--spec update_iterator(emqx_ds:db(), emqx_ds:iterator(), emqx_ds:message_key()) ->
-    emqx_ds:make_iterator_result().
-update_iterator(DB, ?skipping_iterator_match(Iterator0, Q1Skip0, Q2Skip0), Key) ->
-    case emqx_ds:update_iterator(DB, Iterator0, Key) of
-        {error, _, _} = Error -> Error;
-        {ok, Iterator1} -> {ok, ?skipping_iterator(Iterator1, Q1Skip0, Q2Skip0)}
-    end.
-
-%%--------------------------------------------------------------------
-%% Internal functions
-%%--------------------------------------------------------------------
-
-skip(Messages, Q1Skip, Q2Skip) ->
-    skip(Messages, Q1Skip, Q2Skip, []).
-
-skip([], Q1Skip, Q2Skip, Agg) ->
-    {lists:reverse(Agg), Q1Skip, Q2Skip};
-skip([{Key, Message} | Messages], Q1Skip, Q2Skip, Agg) ->
-    Qos = emqx_message:qos(Message),
-    skip({Key, Message}, Qos, Messages, Q1Skip, Q2Skip, Agg).
-
-skip(_KeyMessage, ?QOS_0, Messages, Q1Skip, Q2Skip, Agg) ->
-    skip(Messages, Q1Skip, Q2Skip, Agg);
-skip(_KeyMessage, ?QOS_1, Messages, Q1Skip, Q2Skip, Agg) when Q1Skip > 0 ->
-    skip(Messages, Q1Skip - 1, Q2Skip, Agg);
-skip(KeyMessage, ?QOS_1, Messages, 0, Q2Skip, Agg) ->
-    skip(Messages, 0, Q2Skip, [KeyMessage | Agg]);
-skip(_KeyMessage, ?QOS_2, Messages, Q1Skip, Q2Skip, Agg) when Q2Skip > 0 ->
-    skip(Messages, Q1Skip, Q2Skip - 1, Agg);
-skip(KeyMessage, ?QOS_2, Messages, Q1Skip, 0, Agg) ->
-    skip(Messages, Q1Skip, 0, [KeyMessage | Agg]).

+ 0 - 36
apps/emqx_durable_storage/src/emqx_ds_skipping_iterator.hrl

@@ -1,36 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
--define(tag, 1).
--define(it, 2).
--define(qos1_skip, 3).
--define(qos2_skip, 4).
-
--define(IT, -1000).
-
--define(skipping_iterator_match, #{?tag := ?IT}).
-
--define(skipping_iterator_match(Iterator, Q1Skip, Q2Skip), #{
-    ?tag := ?IT, ?it := Iterator, ?qos1_skip := Q1Skip, ?qos2_skip := Q2Skip
-}).
-
--define(skipping_iterator(Iterator, Q1Skip, Q2Skip), #{
-    ?tag => ?IT, ?it => Iterator, ?qos1_skip => Q1Skip, ?qos2_skip => Q2Skip
-}).
-
--define(QOS_0, 0).
--define(QOS_1, 1).
--define(QOS_2, 2).

+ 19 - 6
apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl

@@ -328,13 +328,13 @@ consume_n_matching(Map, Pred, N, S0, Acc) ->
             end
     end.
 
-persistent_route_to_subscription(#route{topic = Topic, dest = SessionId}) ->
-    case emqx_persistent_session_ds:get_client_subscription(SessionId, Topic) of
+persistent_route_to_subscription(#route{dest = Dest} = Route) ->
+    case get_client_subscription(Route) of
         #{subopts := SubOpts} ->
             #{qos := Qos, nl := Nl, rh := Rh, rap := Rap} = SubOpts,
             #{
-                topic => Topic,
-                clientid => SessionId,
+                topic => format_topic(Route),
+                clientid => session_id(Dest),
                 node => all,
 
                 qos => Qos,
@@ -345,13 +345,26 @@ persistent_route_to_subscription(#route{topic = Topic, dest = SessionId}) ->
             };
         undefined ->
             #{
-                topic => Topic,
-                clientid => SessionId,
+                topic => format_topic(Route),
+                clientid => session_id(Dest),
                 node => all,
                 durable => true
             }
     end.
 
+get_client_subscription(#route{topic = Topic, dest = #share_dest{session_id = SessionId, group = Group}}) ->
+    emqx_persistent_session_ds:get_client_subscription(SessionId, #share{topic = Topic, group = Group});
+get_client_subscription(#route{topic = Topic, dest = SessionId}) ->
+    emqx_persistent_session_ds:get_client_subscription(SessionId, Topic).
+
+session_id(#share_dest{session_id = SessionId}) -> SessionId;
+session_id(SessionId) -> SessionId.
+
+format_topic(#route{topic = Topic, dest = #share_dest{group = Group}}) ->
+    <<"$share/", Group/binary, "/", Topic/binary>>;
+format_topic(#route{topic = Topic}) ->
+    Topic.
+
 %% @private This function merges paginated results from two sources.
 %%
 %% Note: this implementation is far from ideal: `count' for the