| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253 |
- %%--------------------------------------------------------------------
- %% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
- %%
- %% Licensed under the Apache License, Version 2.0 (the "License");
- %% you may not use this file except in compliance with the License.
- %% You may obtain a copy of the License at
- %%
- %% http://www.apache.org/licenses/LICENSE-2.0
- %%
- %% Unless required by applicable law or agreed to in writing, software
- %% distributed under the License is distributed on an "AS IS" BASIS,
- %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- %% See the License for the specific language governing permissions and
- %% limitations under the License.
- %%--------------------------------------------------------------------
- -module(emqx_shared_sub_SUITE).
- -compile(export_all).
- -compile(nowarn_export_all).
- -include("emqx.hrl").
- -include_lib("eunit/include/eunit.hrl").
- -include_lib("common_test/include/ct.hrl").
- -define(SUITE, ?MODULE).
- -define(wait(For, Timeout),
- emqx_ct_helpers:wait_for(
- ?FUNCTION_NAME, ?LINE, fun() -> For end, Timeout)).
- all() -> emqx_ct:all(?SUITE).
- init_per_suite(Config) ->
- emqx_ct_helpers:start_apps([]),
- Config.
- end_per_suite(_Config) ->
- emqx_ct_helpers:stop_apps([]).
- t_random_basic(_) ->
- ok = ensure_config(random),
- ClientId = <<"ClientId">>,
- Topic = <<"foo">>,
- Payload = <<"hello">>,
- emqx:subscribe(Topic, #{qos => 2, share => <<"group1">>}),
- MsgQoS2 = emqx_message:make(ClientId, 2, Topic, Payload),
- %% wait for the subscription to show up
- ct:sleep(200),
- ?assertEqual(true, subscribed(<<"group1">>, Topic, self())),
- emqx:publish(MsgQoS2),
- receive
- {deliver, Topic0, #message{from = ClientId0,
- payload = Payload0}} = M->
- ct:pal("==== received: ~p", [M]),
- ?assertEqual(Topic, Topic0),
- ?assertEqual(ClientId, ClientId0),
- ?assertEqual(Payload, Payload0)
- after 1000 -> ct:fail(waiting_basic_failed)
- end,
- ok.
- %% Start two subscribers share subscribe to "$share/g1/foo/bar"
- %% Set 'sticky' dispatch strategy, send 1st message to find
- %% out which member it picked, then close its connection
- %% send the second message, the message should be 'nack'ed
- %% by the sticky session and delivered to the 2nd session.
- %% After the connection for the 2nd session is also closed,
- %% i.e. when all clients are offline, the following message(s)
- %% should be delivered randomly.
- t_no_connection_nack(_) ->
- ok = ensure_config(sticky),
- Publisher = <<"publisher">>,
- Subscriber1 = <<"Subscriber1">>,
- Subscriber2 = <<"Subscriber2">>,
- QoS = 1,
- Group = <<"g1">>,
- Topic = <<"foo/bar">>,
- ShareTopic = <<"$share/", Group/binary, $/, Topic/binary>>,
- ExpProp = [{properties, #{'Session-Expiry-Interval' => timer:seconds(30)}}],
- {ok, SubConnPid1} = emqtt:start_link([{client_id, Subscriber1}] ++ ExpProp),
- {ok, _Props} = emqtt:connect(SubConnPid1),
- {ok, SubConnPid2} = emqtt:start_link([{client_id, Subscriber2}] ++ ExpProp),
- {ok, _Props} = emqtt:connect(SubConnPid2),
- emqtt:subscribe(SubConnPid1, ShareTopic, QoS),
- emqtt:subscribe(SubConnPid1, ShareTopic, QoS),
- %% wait for the subscriptions to show up
- ct:sleep(200),
- MkPayload = fun(PacketId) ->
- iolist_to_binary(["hello-", integer_to_list(PacketId)])
- end,
- SendF = fun(PacketId) ->
- M = emqx_message:make(Publisher, QoS, Topic, MkPayload(PacketId)),
- emqx:publish(M#message{id = PacketId})
- end,
- SendF(1),
- timer:sleep(200),
- %% This is the connection which was picked by broker to dispatch (sticky) for 1st message
- ?assertMatch([#{packet_id := 1}], recv_msgs(1)),
- %% Now kill the connection, expect all following messages to be delivered to the other subscriber.
- %emqx_mock_client:stop(ConnPid),
- %% sleep then make synced calls to session processes to ensure that
- %% the connection pid's 'EXIT' message is propagated to the session process
- %% also to be sure sessions are still alive
- % timer:sleep(2),
- % _ = emqx_session:info(SPid1),
- % _ = emqx_session:info(SPid2),
- % %% Now we know what is the other still alive connection
- % [TheOtherConnPid] = [SubConnPid1, SubConnPid2] -- [ConnPid],
- % %% Send some more messages
- % PacketIdList = lists:seq(2, 10),
- % lists:foreach(fun(Id) ->
- % SendF(Id),
- % ?wait(Received(Id, TheOtherConnPid), 1000)
- % end, PacketIdList),
- % %% Now close the 2nd (last connection)
- % emqx_mock_client:stop(TheOtherConnPid),
- % timer:sleep(2),
- % %% both sessions should have conn_pid = undefined
- % ?assertEqual({conn_pid, undefined}, lists:keyfind(conn_pid, 1, emqx_session:info(SPid1))),
- % ?assertEqual({conn_pid, undefined}, lists:keyfind(conn_pid, 1, emqx_session:info(SPid2))),
- % %% send more messages, but all should be queued in session state
- % lists:foreach(fun(Id) -> SendF(Id) end, PacketIdList),
- % {_, L1} = lists:keyfind(mqueue_len, 1, emqx_session:info(SPid1)),
- % {_, L2} = lists:keyfind(mqueue_len, 1, emqx_session:info(SPid2)),
- % ?assertEqual(length(PacketIdList), L1 + L2),
- % %% clean up
- % emqx_mock_client:close_session(PubConnPid),
- % emqx_sm:close_session(SPid1),
- % emqx_sm:close_session(SPid2),
- ok.
- t_random(_) ->
- test_two_messages(random).
- t_round_robin(_) ->
- test_two_messages(round_robin).
- t_sticky(_) ->
- test_two_messages(sticky).
- t_hash(_) ->
- test_two_messages(hash, false).
- %% if the original subscriber dies, change to another one alive
- t_not_so_sticky(_) ->
- ok = ensure_config(sticky),
- ClientId1 = <<"ClientId1">>,
- ClientId2 = <<"ClientId2">>,
- {ok, C1} = emqx_client:start_link([{client_id, ClientId1}]),
- {ok, _} = emqx_client:connect(C1),
- {ok, C2} = emqx_client:start_link([{client_id, ClientId2}]),
- {ok, _} = emqx_client:connect(C2),
- emqx_client:subscribe(C1, {<<"$share/group1/foo/bar">>, 0}),
- timer:sleep(50),
- emqx_client:publish(C2, <<"foo/bar">>, <<"hello1">>),
- ?assertMatch([#{payload := <<"hello1">>}], recv_msgs(1)),
- emqx_client:unsubscribe(C1, <<"$share/group1/foo/bar">>),
- timer:sleep(50),
- emqx_client:subscribe(C1, {<<"$share/group1/foo/#">>, 0}),
- timer:sleep(50),
- emqx_client:publish(C2, <<"foo/bar">>, <<"hello2">>),
- ?assertMatch([#{payload := <<"hello2">>}], recv_msgs(1)),
- emqx_client:disconnect(C1),
- emqx_client:disconnect(C2),
- ok.
- test_two_messages(Strategy) ->
- test_two_messages(Strategy, _WithAck = true).
- test_two_messages(Strategy, WithAck) ->
- ok = ensure_config(Strategy, WithAck),
- Topic = <<"foo/bar">>,
- ClientId1 = <<"ClientId1">>,
- ClientId2 = <<"ClientId2">>,
- {ok, ConnPid1} = emqx_client:start_link([{client_id, ClientId1}]),
- {ok, _} = emqx_client:connect(ConnPid1),
- {ok, ConnPid2} = emqx_client:start_link([{client_id, ClientId2}]),
- {ok, _} = emqx_client:connect(ConnPid2),
- Message1 = emqx_message:make(ClientId1, 0, Topic, <<"hello1">>),
- Message2 = emqx_message:make(ClientId1, 0, Topic, <<"hello2">>),
- emqx_client:subscribe(ConnPid1, {<<"$share/group1/foo/bar">>, 0}),
- emqx_client:subscribe(ConnPid2, {<<"$share/group1/foo/bar">>, 0}),
- ct:sleep(100),
- emqx:publish(Message1),
- Me = self(),
- WaitF = fun(ExpectedPayload) ->
- case last_message(ExpectedPayload, [ConnPid1, ConnPid2]) of
- {true, Pid} ->
- Me ! {subscriber, Pid},
- true;
- Other ->
- Other
- end
- end,
- WaitF(<<"hello1">>),
- UsedSubPid1 = receive {subscriber, P1} -> P1 end,
- emqx_broker:publish(Message2),
- WaitF(<<"hello2">>),
- UsedSubPid2 = receive {subscriber, P2} -> P2 end,
- case Strategy of
- sticky -> ?assert(UsedSubPid1 =:= UsedSubPid2);
- round_robin -> ?assert(UsedSubPid1 =/= UsedSubPid2);
- hash -> ?assert(UsedSubPid1 =:= UsedSubPid2);
- _ -> ok
- end,
- emqx_client:stop(ConnPid1),
- emqx_client:stop(ConnPid2),
- ok.
- last_message(ExpectedPayload, Pids) ->
- receive
- {publish, #{client_pid := Pid, payload := ExpectedPayload}} ->
- ct:pal("~p ====== ~p", [Pids, Pid]),
- {true, Pid}
- after 100 ->
- <<"not yet?">>
- end.
- %%--------------------------------------------------------------------
- %% help functions
- %%--------------------------------------------------------------------
- ensure_config(Strategy) ->
- ensure_config(Strategy, _AckEnabled = true).
- ensure_config(Strategy, AckEnabled) ->
- application:set_env(emqx, shared_subscription_strategy, Strategy),
- application:set_env(emqx, shared_dispatch_ack_enabled, AckEnabled),
- ok.
- subscribed(Group, Topic, Pid) ->
- lists:member(Pid, emqx_shared_sub:subscribers(Group, Topic)).
- recv_msgs(Count) ->
- recv_msgs(Count, []).
- recv_msgs(0, Msgs) ->
- Msgs;
- recv_msgs(Count, Msgs) ->
- receive
- {publish, Msg} ->
- recv_msgs(Count-1, [Msg|Msgs]);
- _Other -> recv_msgs(Count, Msgs) %%TODO:: remove the branch?
- after 100 ->
- Msgs
- end.
|