Przeglądaj źródła

test(dssubs): add regression test with multiple groups / subscribers

To verify later that changes to the storage and topic structure layout
do not affect the correctness.
Andrew Mayorov 1 rok temu
rodzic
commit
b59002f144

+ 76 - 0
apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl

@@ -425,6 +425,82 @@ t_intensive_reassign(_Config) ->
     ok = emqtt:disconnect(ConnShared3),
     ok = emqtt:disconnect(ConnShared3),
     ok = emqtt:disconnect(ConnPub).
     ok = emqtt:disconnect(ConnPub).
 
 
+t_multiple_groups(groups, _Groups) ->
+    [declare_explicit];
+t_multiple_groups('init', Config) ->
+    Now = emqx_message:timestamp_now(),
+    NQueues = 50,
+    Group = <<"multi">>,
+    Topics = [emqx_utils:format("t/mg/~p", [I]) || I <- lists:seq(1, NQueues)],
+    Queues = lists:map(
+        fun(Topic) ->
+            {ok, Queue} = emqx_ds_shared_sub_queue:declare(Group, wildcard(Topic), Now, 0),
+            Queue
+        end,
+        Topics
+    ),
+    [
+        {queue_group, Group},
+        {queue_topics, Topics},
+        {queues, Queues}
+        | Config
+    ];
+t_multiple_groups('end', Config) ->
+    Topics = proplists:get_value(queue_topics, Config),
+    lists:foreach(
+        fun(Topic) -> emqx_ds_shared_sub_queue:destroy(<<"multi">>, Topic) end,
+        Topics
+    ).
+
+t_multiple_groups(Config) ->
+    Topics = proplists:get_value(queue_topics, Config),
+    NSubs = 20,
+    NPubs = 1000,
+    NQueues = length(Topics),
+    ConnPub = emqtt_connect_pub(<<"t_multiple_groups:pub">>),
+    ConnSubs = lists:map(
+        fun(I) ->
+            ClientId = emqx_utils:format("t_multiple_groups:sub:~p", [I]),
+            ConnSub = emqtt_connect_sub(ClientId),
+            ok = lists:foreach(
+                fun(Ti) ->
+                    Topic = lists:nth(Ti, Topics),
+                    TopicSub = emqx_topic:join([<<"$share/multi">>, wildcard(Topic)]),
+                    {ok, _, [1]} = emqtt:subscribe(ConnSub, TopicSub, 1)
+                end,
+                lists:seq(I, NQueues, NSubs)
+            ),
+            ConnSub
+        end,
+        lists:seq(1, NSubs)
+    ),
+
+    Payloads = lists:map(
+        fun(Pi) ->
+            Qi = pick_queue(Pi, NQueues),
+            Payload = integer_to_binary(Pi),
+            TopicPub = emqx_topic:join([lists:nth(Qi, Topics), integer_to_binary(Pi)]),
+            {ok, _} = emqtt:publish(ConnPub, TopicPub, Payload, 1),
+            Payload
+        end,
+        lists:seq(1, NPubs)
+    ),
+
+    Pubs = drain_publishes(),
+    ?assertMatch(
+        {[_ | _], []},
+        lists:partition(fun(#{payload := P}) -> lists:member(P, Payloads) end, Pubs)
+    ),
+
+    lists:foreach(fun emqtt:disconnect/1, [ConnPub | ConnSubs]).
+
+pick_queue(I, NQueues) ->
+    %% NOTE: Allocate publishes to queues unevenly, but every queue is utilized.
+    round(math:sqrt(NQueues) * math:log2(I)) rem NQueues + 1.
+
+wildcard(Topic) ->
+    emqx_topic:join([Topic, '#']).
+
 t_unsubscribe('init', Config) ->
 t_unsubscribe('init', Config) ->
     declare_queue_if_needed(<<"gr9">>, <<"topic9/#">>, Config);
     declare_queue_if_needed(<<"gr9">>, <<"topic9/#">>, Config);
 t_unsubscribe('end', Config) ->
 t_unsubscribe('end', Config) ->