emqx_shared_sub_SUITE.erl 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_shared_sub_SUITE).
  17. -compile(export_all).
  18. -compile(nowarn_export_all).
  19. -include("emqx.hrl").
  20. -include_lib("eunit/include/eunit.hrl").
  21. -include_lib("common_test/include/ct.hrl").
  22. -define(SUITE, ?MODULE).
  23. -define(wait(For, Timeout),
  24. emqx_ct_helpers:wait_for(
  25. ?FUNCTION_NAME, ?LINE, fun() -> For end, Timeout)).
  26. all() -> emqx_ct:all(?SUITE).
  27. init_per_suite(Config) ->
  28. emqx_ct_helpers:start_apps([]),
  29. Config.
  30. end_per_suite(_Config) ->
  31. emqx_ct_helpers:stop_apps([]).
  32. t_random_basic(_) ->
  33. ok = ensure_config(random),
  34. ClientId = <<"ClientId">>,
  35. Topic = <<"foo">>,
  36. Payload = <<"hello">>,
  37. emqx:subscribe(Topic, #{qos => 2, share => <<"group1">>}),
  38. MsgQoS2 = emqx_message:make(ClientId, 2, Topic, Payload),
  39. %% wait for the subscription to show up
  40. ct:sleep(200),
  41. ?assertEqual(true, subscribed(<<"group1">>, Topic, self())),
  42. emqx:publish(MsgQoS2),
  43. receive
  44. {deliver, Topic0, #message{from = ClientId0,
  45. payload = Payload0}} = M->
  46. ct:pal("==== received: ~p", [M]),
  47. ?assertEqual(Topic, Topic0),
  48. ?assertEqual(ClientId, ClientId0),
  49. ?assertEqual(Payload, Payload0)
  50. after 1000 -> ct:fail(waiting_basic_failed)
  51. end,
  52. ok.
  53. %% Start two subscribers share subscribe to "$share/g1/foo/bar"
  54. %% Set 'sticky' dispatch strategy, send 1st message to find
  55. %% out which member it picked, then close its connection
  56. %% send the second message, the message should be 'nack'ed
  57. %% by the sticky session and delivered to the 2nd session.
  58. %% After the connection for the 2nd session is also closed,
  59. %% i.e. when all clients are offline, the following message(s)
  60. %% should be delivered randomly.
  61. t_no_connection_nack(_) ->
  62. ok = ensure_config(sticky),
  63. Publisher = <<"publisher">>,
  64. Subscriber1 = <<"Subscriber1">>,
  65. Subscriber2 = <<"Subscriber2">>,
  66. QoS = 1,
  67. Group = <<"g1">>,
  68. Topic = <<"foo/bar">>,
  69. ShareTopic = <<"$share/", Group/binary, $/, Topic/binary>>,
  70. ExpProp = [{properties, #{'Session-Expiry-Interval' => timer:seconds(30)}}],
  71. {ok, SubConnPid1} = emqtt:start_link([{client_id, Subscriber1}] ++ ExpProp),
  72. {ok, _Props} = emqtt:connect(SubConnPid1),
  73. {ok, SubConnPid2} = emqtt:start_link([{client_id, Subscriber2}] ++ ExpProp),
  74. {ok, _Props} = emqtt:connect(SubConnPid2),
  75. emqtt:subscribe(SubConnPid1, ShareTopic, QoS),
  76. emqtt:subscribe(SubConnPid1, ShareTopic, QoS),
  77. %% wait for the subscriptions to show up
  78. ct:sleep(200),
  79. MkPayload = fun(PacketId) ->
  80. iolist_to_binary(["hello-", integer_to_list(PacketId)])
  81. end,
  82. SendF = fun(PacketId) ->
  83. M = emqx_message:make(Publisher, QoS, Topic, MkPayload(PacketId)),
  84. emqx:publish(M#message{id = PacketId})
  85. end,
  86. SendF(1),
  87. timer:sleep(200),
  88. %% This is the connection which was picked by broker to dispatch (sticky) for 1st message
  89. ?assertMatch([#{packet_id := 1}], recv_msgs(1)),
  90. %% Now kill the connection, expect all following messages to be delivered to the other subscriber.
  91. %emqx_mock_client:stop(ConnPid),
  92. %% sleep then make synced calls to session processes to ensure that
  93. %% the connection pid's 'EXIT' message is propagated to the session process
  94. %% also to be sure sessions are still alive
  95. % timer:sleep(2),
  96. % _ = emqx_session:info(SPid1),
  97. % _ = emqx_session:info(SPid2),
  98. % %% Now we know what is the other still alive connection
  99. % [TheOtherConnPid] = [SubConnPid1, SubConnPid2] -- [ConnPid],
  100. % %% Send some more messages
  101. % PacketIdList = lists:seq(2, 10),
  102. % lists:foreach(fun(Id) ->
  103. % SendF(Id),
  104. % ?wait(Received(Id, TheOtherConnPid), 1000)
  105. % end, PacketIdList),
  106. % %% Now close the 2nd (last connection)
  107. % emqx_mock_client:stop(TheOtherConnPid),
  108. % timer:sleep(2),
  109. % %% both sessions should have conn_pid = undefined
  110. % ?assertEqual({conn_pid, undefined}, lists:keyfind(conn_pid, 1, emqx_session:info(SPid1))),
  111. % ?assertEqual({conn_pid, undefined}, lists:keyfind(conn_pid, 1, emqx_session:info(SPid2))),
  112. % %% send more messages, but all should be queued in session state
  113. % lists:foreach(fun(Id) -> SendF(Id) end, PacketIdList),
  114. % {_, L1} = lists:keyfind(mqueue_len, 1, emqx_session:info(SPid1)),
  115. % {_, L2} = lists:keyfind(mqueue_len, 1, emqx_session:info(SPid2)),
  116. % ?assertEqual(length(PacketIdList), L1 + L2),
  117. % %% clean up
  118. % emqx_mock_client:close_session(PubConnPid),
  119. % emqx_sm:close_session(SPid1),
  120. % emqx_sm:close_session(SPid2),
  121. ok.
  122. t_random(_) ->
  123. test_two_messages(random).
  124. t_round_robin(_) ->
  125. test_two_messages(round_robin).
  126. t_sticky(_) ->
  127. test_two_messages(sticky).
  128. t_hash(_) ->
  129. test_two_messages(hash, false).
  130. %% if the original subscriber dies, change to another one alive
  131. t_not_so_sticky(_) ->
  132. ok = ensure_config(sticky),
  133. ClientId1 = <<"ClientId1">>,
  134. ClientId2 = <<"ClientId2">>,
  135. {ok, C1} = emqx_client:start_link([{client_id, ClientId1}]),
  136. {ok, _} = emqx_client:connect(C1),
  137. {ok, C2} = emqx_client:start_link([{client_id, ClientId2}]),
  138. {ok, _} = emqx_client:connect(C2),
  139. emqx_client:subscribe(C1, {<<"$share/group1/foo/bar">>, 0}),
  140. timer:sleep(50),
  141. emqx_client:publish(C2, <<"foo/bar">>, <<"hello1">>),
  142. ?assertMatch([#{payload := <<"hello1">>}], recv_msgs(1)),
  143. emqx_client:unsubscribe(C1, <<"$share/group1/foo/bar">>),
  144. timer:sleep(50),
  145. emqx_client:subscribe(C1, {<<"$share/group1/foo/#">>, 0}),
  146. timer:sleep(50),
  147. emqx_client:publish(C2, <<"foo/bar">>, <<"hello2">>),
  148. ?assertMatch([#{payload := <<"hello2">>}], recv_msgs(1)),
  149. emqx_client:disconnect(C1),
  150. emqx_client:disconnect(C2),
  151. ok.
  152. test_two_messages(Strategy) ->
  153. test_two_messages(Strategy, _WithAck = true).
  154. test_two_messages(Strategy, WithAck) ->
  155. ok = ensure_config(Strategy, WithAck),
  156. Topic = <<"foo/bar">>,
  157. ClientId1 = <<"ClientId1">>,
  158. ClientId2 = <<"ClientId2">>,
  159. {ok, ConnPid1} = emqx_client:start_link([{client_id, ClientId1}]),
  160. {ok, _} = emqx_client:connect(ConnPid1),
  161. {ok, ConnPid2} = emqx_client:start_link([{client_id, ClientId2}]),
  162. {ok, _} = emqx_client:connect(ConnPid2),
  163. Message1 = emqx_message:make(ClientId1, 0, Topic, <<"hello1">>),
  164. Message2 = emqx_message:make(ClientId1, 0, Topic, <<"hello2">>),
  165. emqx_client:subscribe(ConnPid1, {<<"$share/group1/foo/bar">>, 0}),
  166. emqx_client:subscribe(ConnPid2, {<<"$share/group1/foo/bar">>, 0}),
  167. ct:sleep(100),
  168. emqx:publish(Message1),
  169. Me = self(),
  170. WaitF = fun(ExpectedPayload) ->
  171. case last_message(ExpectedPayload, [ConnPid1, ConnPid2]) of
  172. {true, Pid} ->
  173. Me ! {subscriber, Pid},
  174. true;
  175. Other ->
  176. Other
  177. end
  178. end,
  179. WaitF(<<"hello1">>),
  180. UsedSubPid1 = receive {subscriber, P1} -> P1 end,
  181. emqx_broker:publish(Message2),
  182. WaitF(<<"hello2">>),
  183. UsedSubPid2 = receive {subscriber, P2} -> P2 end,
  184. case Strategy of
  185. sticky -> ?assert(UsedSubPid1 =:= UsedSubPid2);
  186. round_robin -> ?assert(UsedSubPid1 =/= UsedSubPid2);
  187. hash -> ?assert(UsedSubPid1 =:= UsedSubPid2);
  188. _ -> ok
  189. end,
  190. emqx_client:stop(ConnPid1),
  191. emqx_client:stop(ConnPid2),
  192. ok.
  193. last_message(ExpectedPayload, Pids) ->
  194. receive
  195. {publish, #{client_pid := Pid, payload := ExpectedPayload}} ->
  196. ct:pal("~p ====== ~p", [Pids, Pid]),
  197. {true, Pid}
  198. after 100 ->
  199. <<"not yet?">>
  200. end.
  201. %%--------------------------------------------------------------------
  202. %% help functions
  203. %%--------------------------------------------------------------------
  204. ensure_config(Strategy) ->
  205. ensure_config(Strategy, _AckEnabled = true).
  206. ensure_config(Strategy, AckEnabled) ->
  207. application:set_env(emqx, shared_subscription_strategy, Strategy),
  208. application:set_env(emqx, shared_dispatch_ack_enabled, AckEnabled),
  209. ok.
  210. subscribed(Group, Topic, Pid) ->
  211. lists:member(Pid, emqx_shared_sub:subscribers(Group, Topic)).
  212. recv_msgs(Count) ->
  213. recv_msgs(Count, []).
  214. recv_msgs(0, Msgs) ->
  215. Msgs;
  216. recv_msgs(Count, Msgs) ->
  217. receive
  218. {publish, Msg} ->
  219. recv_msgs(Count-1, [Msg|Msgs]);
  220. _Other -> recv_msgs(Count, Msgs) %%TODO:: remove the branch?
  221. after 100 ->
  222. Msgs
  223. end.