Преглед изворни кода

feat(queue): fix progress reporting and more tests

We test reassignment during the intensive replay
Ilya Averyanov пре 1 година
родитељ
комит
8dce530d15

+ 2 - 2
apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl

@@ -152,7 +152,7 @@ to_map(_S, _SharedSubS) ->
 stream_progresses(S) ->
 stream_progresses(S) ->
     fold_shared_stream_states(
     fold_shared_stream_states(
         fun(TopicFilter, Stream, SRS, Acc) ->
         fun(TopicFilter, Stream, SRS, Acc) ->
-            #srs{it_begin = BeginIt} = SRS,
+            #srs{it_end = EndIt} = SRS,
 
 
             case is_stream_fully_acked(S, SRS) of
             case is_stream_fully_acked(S, SRS) of
                 true ->
                 true ->
@@ -161,7 +161,7 @@ stream_progresses(S) ->
                     StreamProgress = #{
                     StreamProgress = #{
                         topic_filter => TopicFilter,
                         topic_filter => TopicFilter,
                         stream => Stream,
                         stream => Stream,
-                        iterator => BeginIt,
+                        iterator => EndIt,
                         use_finished => is_use_finished(S, SRS)
                         use_finished => is_use_finished(S, SRS)
                     },
                     },
                     [StreamProgress | Acc];
                     [StreamProgress | Acc];

+ 8 - 10
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl

@@ -540,7 +540,7 @@ update_stream_progresses(
     }.
     }.
 
 
 clean_revoked_streams(
 clean_revoked_streams(
-    Data0, #{revoked_streams := RevokedStreams0} = AgentState0, ReceivedStreamProgresses
+    Data0, _Agent, #{revoked_streams := RevokedStreams0} = AgentState0, ReceivedStreamProgresses
 ) ->
 ) ->
     FinishedReportedStreams = maps:from_list(
     FinishedReportedStreams = maps:from_list(
         lists:filtermap(
         lists:filtermap(
@@ -569,13 +569,7 @@ clean_revoked_streams(
     {AgentState1, Data1}.
     {AgentState1, Data1}.
 
 
 unassign_streams(#{stream_owners := StreamOwners0} = Data, Streams) ->
 unassign_streams(#{stream_owners := StreamOwners0} = Data, Streams) ->
-    StreamOwners1 = lists:foldl(
-        fun(Stream, StreamOwnersAcc) ->
-            maps:remove(Stream, StreamOwnersAcc)
-        end,
-        StreamOwners0,
-        Streams
-    ),
+    StreamOwners1 = maps:without(Streams, StreamOwners0),
     Data#{
     Data#{
         stream_owners => StreamOwners1
         stream_owners => StreamOwners1
     }.
     }.
@@ -591,7 +585,9 @@ update_agent_stream_states(Data0, Agent, AgentStreamProgresses, VersionOld, Vers
             %% Client started updating
             %% Client started updating
             Data1 = update_stream_progresses(Data0, Agent, AgentStreamProgresses),
             Data1 = update_stream_progresses(Data0, Agent, AgentStreamProgresses),
             AgentState1 = update_agent_timeout(AgentState0),
             AgentState1 = update_agent_timeout(AgentState0),
-            {AgentState2, Data2} = clean_revoked_streams(Data1, AgentState1, AgentStreamProgresses),
+            {AgentState2, Data2} = clean_revoked_streams(
+                Data1, Agent, AgentState1, AgentStreamProgresses
+            ),
             AgentState3 =
             AgentState3 =
                 case AgentState2 of
                 case AgentState2 of
                     #{revoked_streams := []} ->
                     #{revoked_streams := []} ->
@@ -603,7 +599,9 @@ update_agent_stream_states(Data0, Agent, AgentStreamProgresses, VersionOld, Vers
         {?updating, AgentPrevVersion, AgentVersion} ->
         {?updating, AgentPrevVersion, AgentVersion} ->
             Data1 = update_stream_progresses(Data0, Agent, AgentStreamProgresses),
             Data1 = update_stream_progresses(Data0, Agent, AgentStreamProgresses),
             AgentState1 = update_agent_timeout(AgentState0),
             AgentState1 = update_agent_timeout(AgentState0),
-            {AgentState2, Data2} = clean_revoked_streams(Data1, AgentState1, AgentStreamProgresses),
+            {AgentState2, Data2} = clean_revoked_streams(
+                Data1, Agent, AgentState1, AgentStreamProgresses
+            ),
             AgentState3 =
             AgentState3 =
                 case AgentState2 of
                 case AgentState2 of
                     #{revoked_streams := []} ->
                     #{revoked_streams := []} ->

+ 100 - 1
apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl

@@ -10,7 +10,6 @@
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
 -include_lib("common_test/include/ct.hrl").
 
 
--include_lib("emqx/include/emqx_mqtt.hrl").
 -include_lib("emqx/include/asserts.hrl").
 -include_lib("emqx/include/asserts.hrl").
 
 
 all() ->
 all() ->
@@ -184,6 +183,89 @@ t_graceful_disconnect(_Config) ->
     ok = emqtt:disconnect(ConnShared2),
     ok = emqtt:disconnect(ConnShared2),
     ok = emqtt:disconnect(ConnPub).
     ok = emqtt:disconnect(ConnPub).
 
 
+t_intensive_reassign(_Config) ->
+    ConnPub = emqtt_connect_pub(<<"client_pub">>),
+
+    ConnShared1 = emqtt_connect_sub(<<"client_shared1">>),
+    {ok, _, _} = emqtt:subscribe(ConnShared1, <<"$share/gr8/topic8/#">>, 1),
+
+    ct:sleep(1000),
+
+    NPubs = 10_000,
+
+    Topics = [<<"topic8/1">>, <<"topic8/2">>, <<"topic8/3">>],
+    ok = publish_n(ConnPub, Topics, 1, NPubs),
+
+    Self = self(),
+    _ = spawn_link(fun() ->
+        ok = publish_n(ConnPub, Topics, NPubs + 1, 2 * NPubs),
+        Self ! publish_done
+    end),
+
+    ConnShared2 = emqtt_connect_sub(<<"client_shared2">>),
+    ConnShared3 = emqtt_connect_sub(<<"client_shared3">>),
+    {ok, _, _} = emqtt:subscribe(ConnShared2, <<"$share/gr8/topic8/#">>, 1),
+    {ok, _, _} = emqtt:subscribe(ConnShared3, <<"$share/gr8/topic8/#">>, 1),
+
+    receive
+        publish_done -> ok
+    end,
+
+    Pubs = drain_publishes(),
+
+    ClientByBid = fun(Pid) ->
+        case Pid of
+            ConnShared1 -> <<"client_shared1">>;
+            ConnShared2 -> <<"client_shared2">>;
+            ConnShared3 -> <<"client_shared3">>
+        end
+    end,
+
+    Messages = lists:foldl(
+        fun(#{payload := Payload, client_pid := Pid}, Acc) ->
+            maps:update_with(
+                binary_to_integer(Payload),
+                fun(Clients) ->
+                    [ClientByBid(Pid) | Clients]
+                end,
+                [ClientByBid(Pid)],
+                Acc
+            )
+        end,
+        #{},
+        Pubs
+    ),
+
+    Missing = lists:filter(
+        fun(N) -> not maps:is_key(N, Messages) end,
+        lists:seq(1, 2 * NPubs)
+    ),
+    Duplicate = lists:filtermap(
+        fun(N) ->
+            case Messages of
+                #{N := [_]} -> false;
+                #{N := [_ | _] = Clients} -> {true, {N, Clients}};
+                _ -> false
+            end
+        end,
+        lists:seq(1, 2 * NPubs)
+    ),
+
+    ?assertEqual(
+        [],
+        Missing
+    ),
+
+    ?assertEqual(
+        [],
+        Duplicate
+    ),
+
+    ok = emqtt:disconnect(ConnShared1),
+    ok = emqtt:disconnect(ConnShared2),
+    ok = emqtt:disconnect(ConnShared3),
+    ok = emqtt:disconnect(ConnPub).
+
 t_lease_reconnect(_Config) ->
 t_lease_reconnect(_Config) ->
     ConnPub = emqtt_connect_pub(<<"client_pub">>),
     ConnPub = emqtt_connect_pub(<<"client_pub">>),
 
 
@@ -265,3 +347,20 @@ terminate_leaders() ->
     ok = supervisor:terminate_child(emqx_ds_shared_sub_sup, emqx_ds_shared_sub_leader_sup),
     ok = supervisor:terminate_child(emqx_ds_shared_sub_sup, emqx_ds_shared_sub_leader_sup),
     {ok, _} = supervisor:restart_child(emqx_ds_shared_sub_sup, emqx_ds_shared_sub_leader_sup),
     {ok, _} = supervisor:restart_child(emqx_ds_shared_sub_sup, emqx_ds_shared_sub_leader_sup),
     ok.
     ok.
+
+publish_n(_Conn, _Topics, From, To) when From > To ->
+    ok;
+publish_n(Conn, [Topic | RestTopics], From, To) ->
+    {ok, _} = emqtt:publish(Conn, Topic, integer_to_binary(From), 1),
+    publish_n(Conn, RestTopics ++ [Topic], From + 1, To).
+
+drain_publishes() ->
+    drain_publishes([]).
+
+drain_publishes(Acc) ->
+    receive
+        {publish, Msg} ->
+            drain_publishes([Msg | Acc])
+    after 5_000 ->
+        lists:reverse(Acc)
+    end.