|
@@ -350,111 +350,6 @@ t_round_robin_per_group_even_distribution_two_groups(_) ->
|
|
|
),
|
|
),
|
|
|
ok.
|
|
ok.
|
|
|
|
|
|
|
|
-t_round_robin_per_group_two_nodes_publish_to_same_node(_) ->
|
|
|
|
|
- ensure_config(round_robin_per_group),
|
|
|
|
|
- Node = start_slave('rr_p_g_t_n', 31337),
|
|
|
|
|
- ensure_node_config(Node, round_robin_per_group),
|
|
|
|
|
-
|
|
|
|
|
- %% connect two subscribers on each node
|
|
|
|
|
- Topic = <<"foo/bar">>,
|
|
|
|
|
- {ok, Subscriber0} = emqtt:start_link([{clientid, <<"C0">>}]),
|
|
|
|
|
- {ok, Subscriber1} = emqtt:start_link([{clientid, <<"C1">>}]),
|
|
|
|
|
- {ok, Subscriber2} = emqtt:start_link([{clientid, <<"C2">>}, {port, 31337}]),
|
|
|
|
|
- {ok, Subscriber3} = emqtt:start_link([{clientid, <<"C3">>}, {port, 31337}]),
|
|
|
|
|
- SubscriberPids = [Subscriber0, Subscriber1, Subscriber2, Subscriber3],
|
|
|
|
|
- lists:foreach(fun(P) -> emqtt:connect(P) end, SubscriberPids),
|
|
|
|
|
-
|
|
|
|
|
- %% node 1 subscribers
|
|
|
|
|
- emqtt:subscribe(Subscriber0, {<<"$share/group1/", Topic/binary>>, 0}),
|
|
|
|
|
- emqtt:subscribe(Subscriber1, {<<"$share/group1/", Topic/binary>>, 0}),
|
|
|
|
|
- %% node 2 subscribers
|
|
|
|
|
- emqtt:subscribe(Subscriber2, {<<"$share/group1/", Topic/binary>>, 0}),
|
|
|
|
|
- emqtt:subscribe(Subscriber3, {<<"$share/group1/", Topic/binary>>, 0}),
|
|
|
|
|
-
|
|
|
|
|
- publish_fire_and_forget(10, Topic),
|
|
|
|
|
-
|
|
|
|
|
- AllMessages = recv_msgs(10),
|
|
|
|
|
- MessagesBySubscriber = lists:foldl(
|
|
|
|
|
- fun(#{client_pid := Subscriber, payload := Payload}, Acc) ->
|
|
|
|
|
- maps:update_with(Subscriber, fun(T) -> [Payload | T] end, [Payload], Acc)
|
|
|
|
|
- end,
|
|
|
|
|
- maps:new(),
|
|
|
|
|
- AllMessages
|
|
|
|
|
- ),
|
|
|
|
|
- lists:foreach(fun(Pid) -> emqtt:stop(Pid) end, SubscriberPids),
|
|
|
|
|
- stop_slave(Node),
|
|
|
|
|
-
|
|
|
|
|
- ?assertEqual(
|
|
|
|
|
- #{
|
|
|
|
|
- Subscriber0 => [<<"0">>, <<"4">>, <<"8">>],
|
|
|
|
|
- Subscriber1 => [<<"1">>, <<"5">>, <<"9">>],
|
|
|
|
|
- Subscriber2 => [<<"2">>, <<"6">>],
|
|
|
|
|
- Subscriber3 => [<<"3">>, <<"7">>]
|
|
|
|
|
- },
|
|
|
|
|
- MessagesBySubscriber
|
|
|
|
|
- ).
|
|
|
|
|
-
|
|
|
|
|
-t_round_robin_per_group_two_nodes_alternating_publish(_) ->
|
|
|
|
|
- ensure_config(round_robin_per_group),
|
|
|
|
|
- Node = start_slave('rr_p_g_t_n_2', 41338),
|
|
|
|
|
- ensure_node_config(Node, round_robin_per_group),
|
|
|
|
|
-
|
|
|
|
|
- %% connect two subscribers on each node
|
|
|
|
|
- Topic = <<"foo/bar">>,
|
|
|
|
|
- {ok, Subscriber0} = emqtt:start_link([{clientid, <<"C0">>}]),
|
|
|
|
|
- {ok, Subscriber1} = emqtt:start_link([{clientid, <<"C1">>}]),
|
|
|
|
|
- {ok, Subscriber2} = emqtt:start_link([{clientid, <<"C2">>}, {port, 41338}]),
|
|
|
|
|
- {ok, Subscriber3} = emqtt:start_link([{clientid, <<"C3">>}, {port, 41338}]),
|
|
|
|
|
- SubscriberPids = [Subscriber0, Subscriber1, Subscriber2, Subscriber3],
|
|
|
|
|
- lists:foreach(fun(P) -> emqtt:connect(P) end, SubscriberPids),
|
|
|
|
|
-
|
|
|
|
|
- %% node 1 subscribers
|
|
|
|
|
- emqtt:subscribe(Subscriber0, {<<"$share/group1/", Topic/binary>>, 0}),
|
|
|
|
|
- emqtt:subscribe(Subscriber1, {<<"$share/group1/", Topic/binary>>, 0}),
|
|
|
|
|
- %% node 2 subscribers
|
|
|
|
|
- emqtt:subscribe(Subscriber2, {<<"$share/group1/", Topic/binary>>, 0}),
|
|
|
|
|
- emqtt:subscribe(Subscriber3, {<<"$share/group1/", Topic/binary>>, 0}),
|
|
|
|
|
-
|
|
|
|
|
- %% alternate publish messages between the nodes
|
|
|
|
|
- lists:foreach(
|
|
|
|
|
- fun(I) ->
|
|
|
|
|
- Message = erlang:integer_to_binary(I),
|
|
|
|
|
- {ok, PublisherPid} =
|
|
|
|
|
- case I rem 2 of
|
|
|
|
|
- 0 -> emqtt:start_link();
|
|
|
|
|
- 1 -> emqtt:start_link([{port, 41338}])
|
|
|
|
|
- end,
|
|
|
|
|
- {ok, _} = emqtt:connect(PublisherPid),
|
|
|
|
|
- emqtt:publish(PublisherPid, Topic, Message),
|
|
|
|
|
- emqtt:stop(PublisherPid),
|
|
|
|
|
- ct:sleep(50)
|
|
|
|
|
- end,
|
|
|
|
|
- lists:seq(0, 9)
|
|
|
|
|
- ),
|
|
|
|
|
-
|
|
|
|
|
- AllMessages = recv_msgs(10),
|
|
|
|
|
- MessagesBySubscriber = lists:foldl(
|
|
|
|
|
- fun(#{client_pid := Subscriber, payload := Payload}, Acc) ->
|
|
|
|
|
- maps:update_with(Subscriber, fun(T) -> [Payload | T] end, [Payload], Acc)
|
|
|
|
|
- end,
|
|
|
|
|
- maps:new(),
|
|
|
|
|
- AllMessages
|
|
|
|
|
- ),
|
|
|
|
|
- lists:foreach(fun(Pid) -> emqtt:stop(Pid) end, SubscriberPids),
|
|
|
|
|
- stop_slave(Node),
|
|
|
|
|
-
|
|
|
|
|
- %% this result show that when clustered round_robin_per_group behaves like the normal round_robin
|
|
|
|
|
- %% strategy meaning that subscribers receive two consecutive messages which is not ideal
|
|
|
|
|
- ?assertEqual(
|
|
|
|
|
- #{
|
|
|
|
|
- Subscriber0 => [<<"0">>, <<"1">>, <<"8">>, <<"9">>],
|
|
|
|
|
- Subscriber1 => [<<"2">>, <<"3">>],
|
|
|
|
|
- Subscriber2 => [<<"4">>, <<"5">>],
|
|
|
|
|
- Subscriber3 => [<<"6">>, <<"7">>]
|
|
|
|
|
- },
|
|
|
|
|
- MessagesBySubscriber
|
|
|
|
|
- ).
|
|
|
|
|
-
|
|
|
|
|
t_sticky(_) ->
|
|
t_sticky(_) ->
|
|
|
ok = ensure_config(sticky, true),
|
|
ok = ensure_config(sticky, true),
|
|
|
test_two_messages(sticky).
|
|
test_two_messages(sticky).
|