emqx_shared_sub_SUITE.erl 45 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2018-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_shared_sub_SUITE).
  17. -compile(export_all).
  18. -compile(nowarn_export_all).
  19. -include_lib("emqx/include/emqx.hrl").
  20. -include_lib("emqx/include/emqx_mqtt.hrl").
  21. -include_lib("eunit/include/eunit.hrl").
  22. -include_lib("common_test/include/ct.hrl").
  23. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  24. -define(SUITE, ?MODULE).
  25. -define(WAIT(TIMEOUT, PATTERN, Res),
  26. (fun() ->
  27. receive
  28. PATTERN ->
  29. Res;
  30. Other ->
  31. ct:fail(#{
  32. expected => ??PATTERN,
  33. got => Other
  34. })
  35. after TIMEOUT ->
  36. ct:fail({timeout, ??PATTERN})
  37. end
  38. end)()
  39. ).
  40. -define(ack, shared_sub_ack).
  41. -define(no_ack, no_ack).
  42. all() -> emqx_common_test_helpers:all(?SUITE).
  43. init_per_suite(Config) ->
  44. Apps = emqx_cth_suite:start([emqx], #{work_dir => emqx_cth_suite:work_dir(Config)}),
  45. [{apps, Apps} | Config].
  46. end_per_suite(Config) ->
  47. emqx_cth_suite:stop(?config(apps, Config)).
  48. init_per_testcase(Case, Config) ->
  49. try
  50. ?MODULE:Case({'init', Config})
  51. catch
  52. error:function_clause ->
  53. Config
  54. end.
  55. end_per_testcase(Case, Config) ->
  56. try
  57. ?MODULE:Case({'end', Config})
  58. catch
  59. error:function_clause ->
  60. ok
  61. end.
  62. t_is_ack_required(Config) when is_list(Config) ->
  63. ?assertEqual(false, emqx_shared_sub:is_ack_required(#message{headers = #{}})).
  64. t_maybe_nack_dropped(Config) when is_list(Config) ->
  65. ?assertEqual(false, emqx_shared_sub:maybe_nack_dropped(#message{headers = #{}})),
  66. Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}},
  67. ?assertEqual(true, emqx_shared_sub:maybe_nack_dropped(Msg)),
  68. ?assertEqual(
  69. ok,
  70. receive
  71. {for_test, {shared_sub_nack, dropped}} -> ok
  72. after 100 -> timeout
  73. end
  74. ).
  75. t_nack_no_connection(Config) when is_list(Config) ->
  76. Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}},
  77. ?assertEqual(ok, emqx_shared_sub:nack_no_connection(Msg)),
  78. ?assertEqual(
  79. ok,
  80. receive
  81. {for_test, {shared_sub_nack, no_connection}} -> ok
  82. after 100 -> timeout
  83. end
  84. ).
  85. t_maybe_ack(Config) when is_list(Config) ->
  86. ?assertEqual(#message{headers = #{}}, emqx_shared_sub:maybe_ack(#message{headers = #{}})),
  87. Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}},
  88. ?assertEqual(
  89. #message{headers = #{shared_dispatch_ack => ?no_ack}},
  90. emqx_shared_sub:maybe_ack(Msg)
  91. ),
  92. ?assertEqual(
  93. ok,
  94. receive
  95. {for_test, ?ack} -> ok
  96. after 100 -> timeout
  97. end
  98. ).
  99. t_random_basic(Config) when is_list(Config) ->
  100. ok = ensure_config(random),
  101. ClientId = <<"ClientId">>,
  102. Topic = <<"foo">>,
  103. Payload = <<"hello">>,
  104. Group = <<"group1">>,
  105. emqx_broker:subscribe(emqx_topic:make_shared_record(Group, Topic), #{qos => 2}),
  106. MsgQoS2 = emqx_message:make(ClientId, 2, Topic, Payload),
  107. %% wait for the subscription to show up
  108. ct:sleep(200),
  109. ?assertEqual(true, subscribed(<<"group1">>, Topic, self())),
  110. emqx:publish(MsgQoS2),
  111. receive
  112. {deliver, Topic0, #message{
  113. from = ClientId0,
  114. payload = Payload0
  115. }} = M ->
  116. ct:pal("==== received: ~p", [M]),
  117. ?assertEqual(Topic, Topic0),
  118. ?assertEqual(ClientId, ClientId0),
  119. ?assertEqual(Payload, Payload0)
  120. after 1000 -> ct:fail(waiting_basic_failed)
  121. end,
  122. ok.
  123. %% Start two subscribers share subscribe to "$share/g1/foo/bar"
  124. %% Set 'sticky' dispatch strategy, send 1st message to find
  125. %% out which member it picked, then close its connection
  126. %% send the second message, the message should be 'nack'ed
  127. %% by the sticky session and delivered to the 2nd session.
  128. %% After the connection for the 2nd session is also closed,
  129. %% i.e. when all clients are offline, the following message(s)
  130. %% should be delivered randomly.
  131. t_no_connection_nack(Config) when is_list(Config) ->
  132. ok = ensure_config(sticky),
  133. Publisher = <<"publisher">>,
  134. Subscriber1 = <<"Subscriber1">>,
  135. Subscriber2 = <<"Subscriber2">>,
  136. QoS = 1,
  137. Group = <<"g1">>,
  138. Topic = <<"foo/bar">>,
  139. ShareTopic = <<"$share/", Group/binary, $/, Topic/binary>>,
  140. ExpProp = [{properties, #{'Session-Expiry-Interval' => timer:seconds(30)}}],
  141. {ok, SubConnPid1} = emqtt:start_link([{clientid, Subscriber1}] ++ ExpProp),
  142. {ok, _Props1} = emqtt:connect(SubConnPid1),
  143. {ok, SubConnPid2} = emqtt:start_link([{clientid, Subscriber2}] ++ ExpProp),
  144. {ok, _Props2} = emqtt:connect(SubConnPid2),
  145. emqtt:subscribe(SubConnPid1, ShareTopic, QoS),
  146. emqtt:subscribe(SubConnPid1, ShareTopic, QoS),
  147. %% wait for the subscriptions to show up
  148. ct:sleep(200),
  149. MkPayload = fun(PacketId) ->
  150. iolist_to_binary(["hello-", integer_to_list(PacketId)])
  151. end,
  152. SendF = fun(PacketId) ->
  153. M = emqx_message:make(Publisher, QoS, Topic, MkPayload(PacketId)),
  154. emqx:publish(M#message{id = PacketId})
  155. end,
  156. SendF(1),
  157. timer:sleep(200),
  158. %% This is the connection which was picked by broker to dispatch (sticky) for 1st message
  159. ?assertMatch([#{packet_id := 1}], recv_msgs(1)),
  160. ok.
  161. t_random(Config) when is_list(Config) ->
  162. ok = ensure_config(random, true),
  163. test_two_messages(random).
  164. t_round_robin(Config) when is_list(Config) ->
  165. ok = ensure_config(round_robin, true),
  166. test_two_messages(round_robin).
  167. t_round_robin_per_group(Config) when is_list(Config) ->
  168. ok = ensure_config(round_robin_per_group, true),
  169. test_two_messages(round_robin_per_group).
  170. %% this would fail if executed with the standard round_robin strategy
  171. t_round_robin_per_group_even_distribution_one_group(Config) when is_list(Config) ->
  172. ok = ensure_config(round_robin_per_group, true),
  173. Topic = <<"foo/bar">>,
  174. Group = <<"group1">>,
  175. {ok, ConnPid1} = emqtt:start_link([{clientid, <<"C0">>}]),
  176. {ok, ConnPid2} = emqtt:start_link([{clientid, <<"C1">>}]),
  177. {ok, _} = emqtt:connect(ConnPid1),
  178. {ok, _} = emqtt:connect(ConnPid2),
  179. emqtt:subscribe(ConnPid1, {<<"$share/", Group/binary, "/", Topic/binary>>, 0}),
  180. emqtt:subscribe(ConnPid2, {<<"$share/", Group/binary, "/", Topic/binary>>, 0}),
  181. %% publisher with persistent connection
  182. {ok, PublisherPid} = emqtt:start_link(),
  183. {ok, _} = emqtt:connect(PublisherPid),
  184. lists:foreach(
  185. fun(I) ->
  186. Message = erlang:integer_to_binary(I),
  187. emqtt:publish(PublisherPid, Topic, Message)
  188. end,
  189. lists:seq(0, 9)
  190. ),
  191. AllReceivedMessages = lists:map(
  192. fun(#{client_pid := SubscriberPid, payload := Payload}) -> {SubscriberPid, Payload} end,
  193. lists:reverse(recv_msgs(10))
  194. ),
  195. MessagesReceivedSubscriber1 = lists:filter(
  196. fun({P, _Payload}) -> P == ConnPid1 end, AllReceivedMessages
  197. ),
  198. MessagesReceivedSubscriber2 = lists:filter(
  199. fun({P, _Payload}) -> P == ConnPid2 end, AllReceivedMessages
  200. ),
  201. emqtt:stop(ConnPid1),
  202. emqtt:stop(ConnPid2),
  203. emqtt:stop(PublisherPid),
  204. %% ensure each subscriber received 5 messages in alternating fashion
  205. %% one receives all even and the other all uneven payloads
  206. ?assertEqual(
  207. [
  208. {ConnPid1, <<"0">>},
  209. {ConnPid1, <<"2">>},
  210. {ConnPid1, <<"4">>},
  211. {ConnPid1, <<"6">>},
  212. {ConnPid1, <<"8">>}
  213. ],
  214. MessagesReceivedSubscriber1
  215. ),
  216. ?assertEqual(
  217. [
  218. {ConnPid2, <<"1">>},
  219. {ConnPid2, <<"3">>},
  220. {ConnPid2, <<"5">>},
  221. {ConnPid2, <<"7">>},
  222. {ConnPid2, <<"9">>}
  223. ],
  224. MessagesReceivedSubscriber2
  225. ),
  226. ok.
  227. t_round_robin_per_group_even_distribution_two_groups(Config) when is_list(Config) ->
  228. ok = ensure_config(round_robin_per_group, true),
  229. Topic = <<"foo/bar">>,
  230. {ok, ConnPid1} = emqtt:start_link([{clientid, <<"C0">>}]),
  231. {ok, ConnPid2} = emqtt:start_link([{clientid, <<"C1">>}]),
  232. {ok, ConnPid3} = emqtt:start_link([{clientid, <<"C2">>}]),
  233. {ok, ConnPid4} = emqtt:start_link([{clientid, <<"C3">>}]),
  234. ConnPids = [ConnPid1, ConnPid2, ConnPid3, ConnPid4],
  235. lists:foreach(fun(P) -> emqtt:connect(P) end, ConnPids),
  236. %% group1 subscribers
  237. emqtt:subscribe(ConnPid1, {<<"$share/group1/", Topic/binary>>, 0}),
  238. emqtt:subscribe(ConnPid2, {<<"$share/group1/", Topic/binary>>, 0}),
  239. %% group2 subscribers
  240. emqtt:subscribe(ConnPid3, {<<"$share/group2/", Topic/binary>>, 0}),
  241. emqtt:subscribe(ConnPid4, {<<"$share/group2/", Topic/binary>>, 0}),
  242. publish_fire_and_forget(10, Topic),
  243. AllReceivedMessages = lists:map(
  244. fun(#{client_pid := SubscriberPid, payload := Payload}) -> {SubscriberPid, Payload} end,
  245. lists:reverse(recv_msgs(20))
  246. ),
  247. MessagesReceivedSubscriber1 = lists:filter(
  248. fun({P, _Payload}) -> P == ConnPid1 end, AllReceivedMessages
  249. ),
  250. MessagesReceivedSubscriber2 = lists:filter(
  251. fun({P, _Payload}) -> P == ConnPid2 end, AllReceivedMessages
  252. ),
  253. MessagesReceivedSubscriber3 = lists:filter(
  254. fun({P, _Payload}) -> P == ConnPid3 end, AllReceivedMessages
  255. ),
  256. MessagesReceivedSubscriber4 = lists:filter(
  257. fun({P, _Payload}) -> P == ConnPid4 end, AllReceivedMessages
  258. ),
  259. lists:foreach(fun(P) -> emqtt:stop(P) end, ConnPids),
  260. %% ensure each subscriber received 5 messages in alternating fashion in each group
  261. %% subscriber 1 and 3 should receive all even messages
  262. %% subscriber 2 and 4 should receive all uneven messages
  263. ?assertEqual(
  264. [
  265. {ConnPid3, <<"0">>},
  266. {ConnPid3, <<"2">>},
  267. {ConnPid3, <<"4">>},
  268. {ConnPid3, <<"6">>},
  269. {ConnPid3, <<"8">>}
  270. ],
  271. MessagesReceivedSubscriber3
  272. ),
  273. ?assertEqual(
  274. [
  275. {ConnPid2, <<"1">>},
  276. {ConnPid2, <<"3">>},
  277. {ConnPid2, <<"5">>},
  278. {ConnPid2, <<"7">>},
  279. {ConnPid2, <<"9">>}
  280. ],
  281. MessagesReceivedSubscriber2
  282. ),
  283. ?assertEqual(
  284. [
  285. {ConnPid4, <<"1">>},
  286. {ConnPid4, <<"3">>},
  287. {ConnPid4, <<"5">>},
  288. {ConnPid4, <<"7">>},
  289. {ConnPid4, <<"9">>}
  290. ],
  291. MessagesReceivedSubscriber4
  292. ),
  293. ?assertEqual(
  294. [
  295. {ConnPid1, <<"0">>},
  296. {ConnPid1, <<"2">>},
  297. {ConnPid1, <<"4">>},
  298. {ConnPid1, <<"6">>},
  299. {ConnPid1, <<"8">>}
  300. ],
  301. MessagesReceivedSubscriber1
  302. ),
  303. ok.
  304. t_sticky(Config) when is_list(Config) ->
  305. ok = ensure_config(sticky, true),
  306. test_two_messages(sticky).
  307. %% two subscribers in one shared group
  308. %% one unsubscribe after receiving a message
  309. %% the other one in the group should receive the next message
  310. t_sticky_unsubscribe(Config) when is_list(Config) ->
  311. ok = ensure_config(sticky, false),
  312. Topic = <<"foo/bar/sticky-unsub">>,
  313. ClientId1 = <<"c1-sticky-unsub">>,
  314. ClientId2 = <<"c2-sticky-unsub">>,
  315. Group = <<"gsu">>,
  316. {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]),
  317. {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}]),
  318. {ok, _} = emqtt:connect(ConnPid1),
  319. {ok, _} = emqtt:connect(ConnPid2),
  320. ShareTopic = <<"$share/", Group/binary, "/", Topic/binary>>,
  321. emqtt:subscribe(ConnPid1, {ShareTopic, 0}),
  322. emqtt:subscribe(ConnPid2, {ShareTopic, 0}),
  323. Message1 = emqx_message:make(ClientId1, 0, Topic, <<"hello1">>),
  324. Message2 = emqx_message:make(ClientId2, 0, Topic, <<"hello2">>),
  325. ct:sleep(100),
  326. emqx:publish(Message1),
  327. {true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1, ConnPid2]),
  328. emqtt:unsubscribe(UsedSubPid1, ShareTopic),
  329. emqx:publish(Message2),
  330. {true, UsedSubPid2} = last_message(<<"hello2">>, [ConnPid1, ConnPid2]),
  331. ?assertNotEqual(UsedSubPid1, UsedSubPid2),
  332. kill_process(ConnPid1, fun(_) -> emqtt:stop(ConnPid1) end),
  333. kill_process(ConnPid2, fun(_) -> emqtt:stop(ConnPid2) end),
  334. ok.
  335. t_hash(Config) when is_list(Config) ->
  336. ok = ensure_config(hash_clientid, false),
  337. test_two_messages(hash_clientid).
  338. t_hash_clientid(Config) when is_list(Config) ->
  339. ok = ensure_config(hash_clientid, false),
  340. test_two_messages(hash_clientid).
  341. t_hash_topic(Config) when is_list(Config) ->
  342. ok = ensure_config(hash_topic, false),
  343. ClientId1 = <<"ClientId1">>,
  344. ClientId2 = <<"ClientId2">>,
  345. {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]),
  346. {ok, _} = emqtt:connect(ConnPid1),
  347. {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}]),
  348. {ok, _} = emqtt:connect(ConnPid2),
  349. Topic1 = <<"foo/bar1">>,
  350. Topic2 = <<"foo/bar2">>,
  351. ?assert(erlang:phash2(Topic1) rem 2 =/= erlang:phash2(Topic2) rem 2),
  352. Message1 = emqx_message:make(ClientId1, 0, Topic1, <<"hello1">>),
  353. Message2 = emqx_message:make(ClientId1, 0, Topic2, <<"hello2">>),
  354. emqtt:subscribe(ConnPid1, {<<"$share/group1/foo/#">>, 0}),
  355. emqtt:subscribe(ConnPid2, {<<"$share/group1/foo/#">>, 0}),
  356. ct:sleep(100),
  357. emqx:publish(Message1),
  358. Me = self(),
  359. WaitF = fun(ExpectedPayload) ->
  360. case last_message(ExpectedPayload, [ConnPid1, ConnPid2]) of
  361. {true, Pid} ->
  362. Me ! {subscriber, Pid},
  363. true;
  364. Other ->
  365. Other
  366. end
  367. end,
  368. WaitF(<<"hello1">>),
  369. UsedSubPid1 =
  370. receive
  371. {subscriber, P1} -> P1
  372. end,
  373. emqx_broker:publish(Message2),
  374. WaitF(<<"hello2">>),
  375. UsedSubPid2 =
  376. receive
  377. {subscriber, P2} -> P2
  378. end,
  379. ?assert(UsedSubPid1 =/= UsedSubPid2),
  380. emqtt:stop(ConnPid1),
  381. emqtt:stop(ConnPid2),
  382. ok.
  383. %% if the original subscriber dies, change to another one alive
  384. t_not_so_sticky(Config) when is_list(Config) ->
  385. ok = ensure_config(sticky),
  386. ClientId1 = <<"ClientId1">>,
  387. ClientId2 = <<"ClientId2">>,
  388. {ok, C1} = emqtt:start_link([{clientid, ClientId1}]),
  389. {ok, _} = emqtt:connect(C1),
  390. {ok, C2} = emqtt:start_link([{clientid, ClientId2}]),
  391. {ok, _} = emqtt:connect(C2),
  392. emqtt:subscribe(C1, {<<"$share/group1/foo/bar">>, 0}),
  393. timer:sleep(50),
  394. emqtt:publish(C2, <<"foo/bar">>, <<"hello1">>),
  395. ?assertMatch([#{payload := <<"hello1">>}], recv_msgs(1)),
  396. emqtt:unsubscribe(C1, <<"$share/group1/foo/bar">>),
  397. timer:sleep(50),
  398. emqtt:subscribe(C1, {<<"$share/group1/foo/#">>, 0}),
  399. timer:sleep(50),
  400. emqtt:publish(C2, <<"foo/bar">>, <<"hello2">>),
  401. ?assertMatch([#{payload := <<"hello2">>}], recv_msgs(1)),
  402. emqtt:disconnect(C1),
  403. emqtt:disconnect(C2),
  404. ok.
  405. test_two_messages(Strategy) ->
  406. test_two_messages(Strategy, <<"group1">>).
  407. test_two_messages(Strategy, Group) ->
  408. Topic = <<"foo/bar">>,
  409. ClientId1 = <<"ClientId1">>,
  410. ClientId2 = <<"ClientId2">>,
  411. {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]),
  412. {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}]),
  413. {ok, _} = emqtt:connect(ConnPid1),
  414. {ok, _} = emqtt:connect(ConnPid2),
  415. emqtt:subscribe(ConnPid1, {<<"$share/", Group/binary, "/", Topic/binary>>, 0}),
  416. emqtt:subscribe(ConnPid2, {<<"$share/", Group/binary, "/", Topic/binary>>, 0}),
  417. Message1 = emqx_message:make(ClientId1, 0, Topic, <<"hello1">>),
  418. Message2 = emqx_message:make(ClientId2, 0, Topic, <<"hello2">>),
  419. ct:sleep(100),
  420. emqx:publish(Message1),
  421. {true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1, ConnPid2]),
  422. emqx:publish(Message2),
  423. {true, UsedSubPid2} = last_message(<<"hello2">>, [ConnPid1, ConnPid2]),
  424. emqtt:stop(ConnPid1),
  425. emqtt:stop(ConnPid2),
  426. case Strategy of
  427. sticky -> ?assertEqual(UsedSubPid1, UsedSubPid2);
  428. round_robin -> ?assertNotEqual(UsedSubPid1, UsedSubPid2);
  429. round_robin_per_group -> ?assertNotEqual(UsedSubPid1, UsedSubPid2);
  430. hash_clientid -> ?assertEqual(UsedSubPid1, UsedSubPid2);
  431. _ -> ok
  432. end,
  433. ok.
  434. last_message(ExpectedPayload, Pids) ->
  435. last_message(ExpectedPayload, Pids, 1000).
  436. last_message(ExpectedPayload, Pids, Timeout) ->
  437. receive
  438. {publish, #{client_pid := Pid, payload := ExpectedPayload}} ->
  439. ?assert(lists:member(Pid, Pids)),
  440. {true, Pid}
  441. after Timeout ->
  442. ct:pal("not yet"),
  443. <<"not yet?">>
  444. end.
  445. t_dispatch(Config) when is_list(Config) ->
  446. ok = ensure_config(random),
  447. Topic = <<"foo">>,
  448. Group = <<"group1">>,
  449. ?assertEqual(
  450. {error, no_subscribers},
  451. emqx_shared_sub:dispatch(Group, Topic, #delivery{message = #message{}})
  452. ),
  453. emqx_broker:subscribe(emqx_topic:make_shared_record(Group, Topic), #{qos => 2}),
  454. ?assertEqual(
  455. {ok, 1},
  456. emqx_shared_sub:dispatch(Group, Topic, #delivery{message = #message{}})
  457. ).
  458. t_uncovered_func(Config) when is_list(Config) ->
  459. ignored = gen_server:call(emqx_shared_sub, ignored),
  460. ok = gen_server:cast(emqx_shared_sub, ignored),
  461. ignored = emqx_shared_sub ! ignored,
  462. {mnesia_table_event, []} = emqx_shared_sub ! {mnesia_table_event, []}.
  463. t_per_group_config(Config) when is_list(Config) ->
  464. ok = ensure_group_config(#{
  465. <<"local_group">> => local,
  466. <<"round_robin_group">> => round_robin,
  467. <<"sticky_group">> => sticky,
  468. <<"round_robin_per_group_group">> => round_robin_per_group
  469. }),
  470. %% Each test is repeated 4 times because random strategy may technically pass the test
  471. %% so we run 8 tests to make random pass in only 1/256 runs
  472. test_two_messages(sticky, <<"sticky_group">>),
  473. test_two_messages(sticky, <<"sticky_group">>),
  474. test_two_messages(round_robin, <<"round_robin_group">>),
  475. test_two_messages(round_robin, <<"round_robin_group">>),
  476. test_two_messages(sticky, <<"sticky_group">>),
  477. test_two_messages(sticky, <<"sticky_group">>),
  478. test_two_messages(round_robin, <<"round_robin_group">>),
  479. test_two_messages(round_robin, <<"round_robin_group">>),
  480. test_two_messages(round_robin_per_group, <<"round_robin_per_group_group">>),
  481. test_two_messages(round_robin_per_group, <<"round_robin_per_group_group">>).
  482. t_local(Config) when is_list(Config) ->
  483. GroupConfig = #{
  484. <<"local_group">> => local,
  485. <<"round_robin_group">> => round_robin,
  486. <<"sticky_group">> => sticky
  487. },
  488. Node = start_peer('local_shared_sub_local_1', 21999),
  489. ok = ensure_group_config(GroupConfig),
  490. ok = ensure_group_config(Node, GroupConfig),
  491. Topic = <<"local_foo/bar">>,
  492. ClientId1 = <<"ClientId1">>,
  493. ClientId2 = <<"ClientId2">>,
  494. {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]),
  495. {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {port, 21999}]),
  496. {ok, _} = emqtt:connect(ConnPid1),
  497. {ok, _} = emqtt:connect(ConnPid2),
  498. emqtt:subscribe(ConnPid1, {<<"$share/local_group/", Topic/binary>>, 0}),
  499. emqtt:subscribe(ConnPid2, {<<"$share/local_group/", Topic/binary>>, 0}),
  500. ct:sleep(100),
  501. Message1 = emqx_message:make(ClientId1, 0, Topic, <<"hello1">>),
  502. Message2 = emqx_message:make(ClientId2, 0, Topic, <<"hello2">>),
  503. emqx:publish(Message1),
  504. {true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1, ConnPid2]),
  505. rpc:call(Node, emqx, publish, [Message2]),
  506. {true, UsedSubPid2} = last_message(<<"hello2">>, [ConnPid1, ConnPid2]),
  507. RemoteLocalGroupStrategy = rpc:call(Node, emqx_shared_sub, strategy, [<<"local_group">>]),
  508. emqtt:stop(ConnPid1),
  509. emqtt:stop(ConnPid2),
  510. stop_peer(Node),
  511. ?assertEqual(local, emqx_shared_sub:strategy(<<"local_group">>)),
  512. ?assertEqual(local, RemoteLocalGroupStrategy),
  513. ?assertNotEqual(UsedSubPid1, UsedSubPid2),
  514. ok.
  515. t_remote(Config) when is_list(Config) ->
  516. %% This testcase verifies dispatching of shared messages to the remote nodes via backplane API.
  517. %%
  518. %% In this testcase we start two EMQX nodes: local and remote.
  519. %% A subscriber connects to the remote node.
  520. %% A publisher connects to the local node and sends three messages with different QoS.
  521. %% The test verifies that the remote side received all three messages.
  522. ok = ensure_config(sticky, true),
  523. GroupConfig = #{
  524. <<"local_group">> => local,
  525. <<"round_robin_group">> => round_robin,
  526. <<"sticky_group">> => sticky
  527. },
  528. Node = start_peer('remote_shared_sub_remote_1', 21999),
  529. ok = ensure_group_config(GroupConfig),
  530. ok = ensure_group_config(Node, GroupConfig),
  531. Topic = <<"foo/bar">>,
  532. ClientIdLocal = <<"ClientId1">>,
  533. ClientIdRemote = <<"ClientId2">>,
  534. {ok, ConnPidLocal} = emqtt:start_link([{clientid, ClientIdLocal}]),
  535. {ok, ConnPidRemote} = emqtt:start_link([{clientid, ClientIdRemote}, {port, 21999}]),
  536. try
  537. {ok, ClientPidLocal} = emqtt:connect(ConnPidLocal),
  538. {ok, _ClientPidRemote} = emqtt:connect(ConnPidRemote),
  539. emqtt:subscribe(ConnPidRemote, {<<"$share/remote_group/", Topic/binary>>, 0}),
  540. ct:sleep(100),
  541. Message1 = emqx_message:make(ClientPidLocal, 0, Topic, <<"hello1">>),
  542. Message2 = emqx_message:make(ClientPidLocal, 1, Topic, <<"hello2">>),
  543. Message3 = emqx_message:make(ClientPidLocal, 2, Topic, <<"hello3">>),
  544. emqx:publish(Message1),
  545. {true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPidRemote]),
  546. emqx:publish(Message2),
  547. {true, UsedSubPid1} = last_message(<<"hello2">>, [ConnPidRemote]),
  548. emqx:publish(Message3),
  549. {true, UsedSubPid1} = last_message(<<"hello3">>, [ConnPidRemote]),
  550. ok
  551. after
  552. emqtt:stop(ConnPidLocal),
  553. emqtt:stop(ConnPidRemote),
  554. stop_peer(Node)
  555. end.
  556. t_local_fallback(Config) when is_list(Config) ->
  557. ok = ensure_group_config(#{
  558. <<"local_group">> => local,
  559. <<"round_robin_group">> => round_robin,
  560. <<"sticky_group">> => sticky
  561. }),
  562. Topic = <<"local_foo/bar">>,
  563. ClientId1 = <<"ClientId1">>,
  564. ClientId2 = <<"ClientId2">>,
  565. Node = start_peer('local_fallback_shared_sub_1', 11888),
  566. {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]),
  567. {ok, _} = emqtt:connect(ConnPid1),
  568. Message1 = emqx_message:make(ClientId1, 0, Topic, <<"hello1">>),
  569. Message2 = emqx_message:make(ClientId2, 0, Topic, <<"hello2">>),
  570. emqtt:subscribe(ConnPid1, {<<"$share/local_group/", Topic/binary>>, 0}),
  571. emqx:publish(Message1),
  572. {true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1]),
  573. rpc:call(Node, emqx, publish, [Message2]),
  574. {true, UsedSubPid2} = last_message(<<"hello2">>, [ConnPid1], 2_000),
  575. emqtt:stop(ConnPid1),
  576. stop_peer(Node),
  577. ?assertEqual(UsedSubPid1, UsedSubPid2),
  578. ok.
  579. %% This one tests that broker tries to select another shared subscriber
  580. %% If the first one doesn't return an ACK
  581. t_redispatch_qos1_with_ack(Config) when is_list(Config) ->
  582. test_redispatch_qos1(Config, true).
  583. t_redispatch_qos1_no_ack(Config) when is_list(Config) ->
  584. test_redispatch_qos1(Config, false).
  585. test_redispatch_qos1(_Config, AckEnabled) ->
  586. ok = ensure_config(sticky, AckEnabled),
  587. Group = <<"group1">>,
  588. Topic = <<"foo/bar">>,
  589. ClientId1 = <<"ClientId1">>,
  590. ClientId2 = <<"ClientId2">>,
  591. {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {auto_ack, false}]),
  592. {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {auto_ack, false}]),
  593. {ok, _} = emqtt:connect(ConnPid1),
  594. {ok, _} = emqtt:connect(ConnPid2),
  595. emqtt:subscribe(ConnPid1, {<<"$share/", Group/binary, "/foo/bar">>, 1}),
  596. emqtt:subscribe(ConnPid2, {<<"$share/", Group/binary, "/foo/bar">>, 1}),
  597. Message = emqx_message:make(ClientId1, 1, Topic, <<"hello1">>),
  598. emqx:publish(Message),
  599. {true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1, ConnPid2]),
  600. ok = emqtt:stop(UsedSubPid1),
  601. Res = last_message(<<"hello1">>, [ConnPid1, ConnPid2], 6000),
  602. ?assertMatch({true, Pid} when Pid =/= UsedSubPid1, Res),
  603. {true, UsedSubPid2} = Res,
  604. emqtt:stop(UsedSubPid2),
  605. ok.
  606. t_qos1_random_dispatch_if_all_members_are_down(Config) when is_list(Config) ->
  607. ok = ensure_config(sticky, true),
  608. Group = <<"group1">>,
  609. Topic = <<"foo/bar">>,
  610. ClientId1 = <<"ClientId1">>,
  611. ClientId2 = <<"ClientId2">>,
  612. SubOpts = [{clean_start, false}],
  613. {ok, ConnPub} = emqtt:start_link([{clientid, <<"pub">>}]),
  614. {ok, _} = emqtt:connect(ConnPub),
  615. {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1} | SubOpts]),
  616. {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2} | SubOpts]),
  617. {ok, _} = emqtt:connect(ConnPid1),
  618. {ok, _} = emqtt:connect(ConnPid2),
  619. emqtt:subscribe(ConnPid1, {<<"$share/", Group/binary, "/foo/bar">>, 1}),
  620. emqtt:subscribe(ConnPid2, {<<"$share/", Group/binary, "/foo/bar">>, 1}),
  621. ok = emqtt:stop(ConnPid1),
  622. ok = emqtt:stop(ConnPid2),
  623. [Pid1, Pid2] = emqx_shared_sub:subscribers(Group, Topic),
  624. ?assert(is_process_alive(Pid1)),
  625. ?assert(is_process_alive(Pid2)),
  626. {ok, _} = emqtt:publish(ConnPub, Topic, <<"hello11">>, 1),
  627. ?retry(
  628. 100,
  629. 10,
  630. begin
  631. Msgs1 = emqx_mqueue:to_list(get_mqueue(Pid1)),
  632. Msgs2 = emqx_mqueue:to_list(get_mqueue(Pid2)),
  633. %% assert the message is in mqueue (because socket is closed)
  634. ?assertMatch([#message{payload = <<"hello11">>}], Msgs1 ++ Msgs2)
  635. end
  636. ),
  637. emqtt:stop(ConnPub),
  638. ok.
  639. get_mqueue(ConnPid) ->
  640. emqx_connection:info({channel, {session, mqueue}}, sys:get_state(ConnPid)).
  641. %% No ack, QoS 2 subscriptions,
  642. %% client1 receives one message, send pubrec, then suspend
  643. %% client2 acts normal (auto_ack=true)
  644. %% Expected behaviour:
  645. %% the messages sent to client1's inflight and mq are re-dispatched after client1 is down
  646. t_dispatch_qos2({init, Config}) when is_list(Config) ->
  647. ok = ensure_config(round_robin, _AckEnabled = false),
  648. emqx_config:put_zone_conf(default, [mqtt, max_inflight], 1),
  649. Config;
  650. t_dispatch_qos2({'end', Config}) when is_list(Config) ->
  651. emqx_config:put_zone_conf(default, [mqtt, max_inflight], 0);
  652. t_dispatch_qos2(Config) when is_list(Config) ->
  653. Topic = <<"foo/bar/1">>,
  654. ClientId1 = <<"ClientId1">>,
  655. ClientId2 = <<"ClientId2">>,
  656. {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {auto_ack, false}]),
  657. {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {auto_ack, true}]),
  658. {ok, _} = emqtt:connect(ConnPid1),
  659. {ok, _} = emqtt:connect(ConnPid2),
  660. emqtt:subscribe(ConnPid1, {<<"$share/group/foo/bar/#">>, 2}),
  661. emqtt:subscribe(ConnPid2, {<<"$share/group/foo/bar/#">>, 2}),
  662. Message1 = emqx_message:make(ClientId1, 2, Topic, <<"hello1">>),
  663. Message2 = emqx_message:make(ClientId1, 2, Topic, <<"hello2">>),
  664. Message3 = emqx_message:make(ClientId1, 2, Topic, <<"hello3">>),
  665. Message4 = emqx_message:make(ClientId1, 2, Topic, <<"hello4">>),
  666. ct:sleep(100),
  667. ok = sys:suspend(ConnPid1),
  668. %% One message is inflight
  669. ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message1)),
  670. ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message2)),
  671. ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message3)),
  672. ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message4)),
  673. %% assert client 2 receives two messages, they are eiter 1,3 or 2,4 depending
  674. %% on if it's picked as the first one for round_robin
  675. MsgRec1 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P1}}, P1),
  676. MsgRec2 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P2}}, P2),
  677. case MsgRec2 of
  678. <<"hello3">> ->
  679. ?assertEqual(<<"hello1">>, MsgRec1);
  680. <<"hello4">> ->
  681. ?assertEqual(<<"hello2">>, MsgRec1)
  682. end,
  683. sys:resume(ConnPid1),
  684. %% emqtt subscriber automatically sends PUBREC, but since auto_ack is set to false
  685. %% so it will never send PUBCOMP, hence EMQX should not attempt to send
  686. %% the 4th message yet since max_inflight is 1.
  687. MsgRec3 = ?WAIT(2000, {publish, #{client_pid := ConnPid1, payload := P3}}, P3),
  688. ct:sleep(100),
  689. %% no message expected
  690. ?assertEqual([], collect_msgs(0)),
  691. %% now kill client 1
  692. kill_process(ConnPid1),
  693. %% client 2 should receive the message
  694. MsgRec4 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P4}}, P4),
  695. case MsgRec2 of
  696. <<"hello3">> ->
  697. ?assertEqual(<<"hello2">>, MsgRec3),
  698. ?assertEqual(<<"hello4">>, MsgRec4);
  699. <<"hello4">> ->
  700. ?assertEqual(<<"hello1">>, MsgRec3),
  701. ?assertEqual(<<"hello3">>, MsgRec4)
  702. end,
  703. emqtt:stop(ConnPid2),
  704. ok.
  705. t_dispatch_qos0({init, Config}) when is_list(Config) ->
  706. Config;
  707. t_dispatch_qos0({'end', Config}) when is_list(Config) ->
  708. ok;
  709. t_dispatch_qos0(Config) when is_list(Config) ->
  710. ok = ensure_config(round_robin, _AckEnabled = false),
  711. Topic = <<"foo/bar/1">>,
  712. ClientId1 = <<"ClientId1">>,
  713. ClientId2 = <<"ClientId2">>,
  714. {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {auto_ack, false}]),
  715. {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {auto_ack, true}]),
  716. {ok, _} = emqtt:connect(ConnPid1),
  717. {ok, _} = emqtt:connect(ConnPid2),
  718. %% subscribe with QoS 0
  719. emqtt:subscribe(ConnPid1, {<<"$share/group/foo/bar/#">>, 0}),
  720. emqtt:subscribe(ConnPid2, {<<"$share/group/foo/bar/#">>, 0}),
  721. %% publish with QoS 2, but should be downgraded to 0 as the subscribers
  722. %% subscribe with QoS 0
  723. Message1 = emqx_message:make(ClientId1, 2, Topic, <<"hello1">>),
  724. Message2 = emqx_message:make(ClientId1, 2, Topic, <<"hello2">>),
  725. Message3 = emqx_message:make(ClientId1, 2, Topic, <<"hello3">>),
  726. Message4 = emqx_message:make(ClientId1, 2, Topic, <<"hello4">>),
  727. ct:sleep(100),
  728. ok = sys:suspend(ConnPid1),
  729. ?assertMatch([_], emqx:publish(Message1)),
  730. ?assertMatch([_], emqx:publish(Message2)),
  731. ?assertMatch([_], emqx:publish(Message3)),
  732. ?assertMatch([_], emqx:publish(Message4)),
  733. MsgRec1 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P1}}, P1),
  734. MsgRec2 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P2}}, P2),
  735. %% assert hello2 > hello1 or hello4 > hello3
  736. ?assert(MsgRec2 > MsgRec1),
  737. kill_process(ConnPid1),
  738. %% expect no redispatch
  739. ?assertEqual([], collect_msgs(timer:seconds(2))),
  740. emqtt:stop(ConnPid2),
  741. ok.
  742. t_session_takeover({init, Config}) when is_list(Config) ->
  743. Config;
  744. t_session_takeover({'end', Config}) when is_list(Config) ->
  745. ok;
  746. t_session_takeover(Config) when is_list(Config) ->
  747. Topic = <<"t1/a">>,
  748. ClientId = iolist_to_binary("c" ++ integer_to_list(erlang:system_time())),
  749. Opts = [
  750. {clientid, ClientId},
  751. {auto_ack, true},
  752. {proto_ver, v5},
  753. {clean_start, false},
  754. {properties, #{'Session-Expiry-Interval' => 60}}
  755. ],
  756. {ok, ConnPid1} = emqtt:start_link(Opts),
  757. %% with the same client ID, start another client
  758. {ok, ConnPid2} = emqtt:start_link(Opts),
  759. {ok, _} = emqtt:connect(ConnPid1),
  760. emqtt:subscribe(ConnPid1, {<<"$share/t1/", Topic/binary>>, _QoS = 1}),
  761. Message1 = emqx_message:make(<<"dummypub">>, 2, Topic, <<"hello1">>),
  762. Message2 = emqx_message:make(<<"dummypub">>, 2, Topic, <<"hello2">>),
  763. Message3 = emqx_message:make(<<"dummypub">>, 2, Topic, <<"hello3">>),
  764. Message4 = emqx_message:make(<<"dummypub">>, 2, Topic, <<"hello4">>),
  765. %% Make sure client1 is functioning
  766. ?assertMatch([_], emqx:publish(Message1)),
  767. {true, _} = last_message(<<"hello1">>, [ConnPid1]),
  768. %% Kill client1
  769. emqtt:stop(ConnPid1),
  770. %% publish another message (should end up in client1's session)
  771. ?assertMatch([_], emqx:publish(Message2)),
  772. %% connect client2 (with the same clientid)
  773. %% should trigger session take over
  774. {ok, _} = emqtt:connect(ConnPid2),
  775. ?assertMatch([_], emqx:publish(Message3)),
  776. ?assertMatch([_], emqx:publish(Message4)),
  777. {true, _} = last_message(<<"hello2">>, [ConnPid2]),
  778. %% We may or may not recv dup hello2 due to QoS1 redelivery
  779. _ = last_message(<<"hello2">>, [ConnPid2]),
  780. {true, _} = last_message(<<"hello3">>, [ConnPid2]),
  781. {true, _} = last_message(<<"hello4">>, [ConnPid2]),
  782. ?assertEqual([], collect_msgs(timer:seconds(2))),
  783. emqtt:stop(ConnPid2),
  784. ok.
  785. t_session_kicked({init, Config}) when is_list(Config) ->
  786. ok = ensure_config(round_robin, _AckEnabled = false),
  787. emqx_config:put_zone_conf(default, [mqtt, max_inflight], 1),
  788. Config;
  789. t_session_kicked({'end', Config}) when is_list(Config) ->
  790. emqx_config:put_zone_conf(default, [mqtt, max_inflight], 0);
  791. t_session_kicked(Config) when is_list(Config) ->
  792. Topic = <<"foo/bar/1">>,
  793. ClientId1 = <<"ClientId1">>,
  794. ClientId2 = <<"ClientId2">>,
  795. {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {auto_ack, false}]),
  796. {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {auto_ack, true}]),
  797. {ok, _} = emqtt:connect(ConnPid1),
  798. {ok, _} = emqtt:connect(ConnPid2),
  799. emqtt:subscribe(ConnPid1, {<<"$share/group/foo/bar/#">>, 2}),
  800. emqtt:subscribe(ConnPid2, {<<"$share/group/foo/bar/#">>, 2}),
  801. Message1 = emqx_message:make(ClientId1, 2, Topic, <<"hello1">>),
  802. Message2 = emqx_message:make(ClientId1, 2, Topic, <<"hello2">>),
  803. Message3 = emqx_message:make(ClientId1, 2, Topic, <<"hello3">>),
  804. Message4 = emqx_message:make(ClientId1, 2, Topic, <<"hello4">>),
  805. ct:sleep(100),
  806. ok = sys:suspend(ConnPid1),
  807. %% One message is inflight
  808. ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message1)),
  809. ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message2)),
  810. ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message3)),
  811. ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message4)),
  812. %% assert client 2 receives two messages, they are eiter 1,3 or 2,4 depending
  813. %% on if it's picked as the first one for round_robin
  814. MsgRec1 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P1}}, P1),
  815. MsgRec2 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P2}}, P2),
  816. ct:pal("MsgRec1: ~p MsgRec2 ~p ~n", [MsgRec1, MsgRec2]),
  817. case MsgRec2 of
  818. <<"hello3">> ->
  819. ?assertEqual(<<"hello1">>, MsgRec1);
  820. <<"hello4">> ->
  821. ?assertEqual(<<"hello2">>, MsgRec1)
  822. end,
  823. sys:resume(ConnPid1),
  824. %% emqtt subscriber automatically sends PUBREC, but since auto_ack is set to false
  825. %% so it will never send PUBCOMP, hence EMQX should not attempt to send
  826. %% the 4th message yet since max_inflight is 1.
  827. MsgRec3 = ?WAIT(2000, {publish, #{client_pid := ConnPid1, payload := P3}}, P3),
  828. case MsgRec2 of
  829. <<"hello3">> ->
  830. ?assertEqual(<<"hello2">>, MsgRec3);
  831. <<"hello4">> ->
  832. ?assertEqual(<<"hello1">>, MsgRec3)
  833. end,
  834. %% no message expected
  835. ?assertEqual([], collect_msgs(0)),
  836. %% now kick client 1
  837. kill_process(ConnPid1, fun(_Pid) -> emqx_cm:kick_session(ClientId1) end),
  838. %% client 2 should NOT receive the message
  839. ?assertEqual([], collect_msgs(1000)),
  840. emqtt:stop(ConnPid2),
  841. ?assertEqual([], collect_msgs(0)),
  842. ok.
  843. -define(UPDATE_SUB_QOS(ConnPid, Topic, QoS),
  844. ?assertMatch({ok, _, [QoS]}, emqtt:subscribe(ConnPid, {Topic, QoS}))
  845. ).
  846. t_different_groups_same_topic({init, Config}) ->
  847. TestName = atom_to_binary(?FUNCTION_NAME),
  848. ClientId = <<TestName/binary, (integer_to_binary(erlang:unique_integer()))/binary>>,
  849. {ok, C} = emqtt:start_link([{clientid, ClientId}, {proto_ver, v5}]),
  850. {ok, _} = emqtt:connect(C),
  851. [{client, C}, {clientid, ClientId} | Config];
  852. t_different_groups_same_topic({'end', Config}) ->
  853. C = ?config(client, Config),
  854. emqtt:stop(C),
  855. ok;
  856. t_different_groups_same_topic(Config) when is_list(Config) ->
  857. C = ?config(client, Config),
  858. ClientId = ?config(clientid, Config),
  859. %% Subscribe and unsubscribe to different group `aa` and `bb` with same topic
  860. GroupA = <<"aa">>,
  861. GroupB = <<"bb">>,
  862. Topic = <<"t/1">>,
  863. SharedTopicGroupA = format_share(GroupA, Topic),
  864. ?UPDATE_SUB_QOS(C, SharedTopicGroupA, ?QOS_2),
  865. SharedTopicGroupB = format_share(GroupB, Topic),
  866. ?UPDATE_SUB_QOS(C, SharedTopicGroupB, ?QOS_2),
  867. ?retry(
  868. _Sleep0 = 100,
  869. _Attempts0 = 50,
  870. begin
  871. ?assertEqual(2, length(emqx_router:match_routes(Topic)))
  872. end
  873. ),
  874. Message0 = emqx_message:make(ClientId, ?QOS_2, Topic, <<"hi">>),
  875. emqx:publish(Message0),
  876. ?assertMatch(
  877. [
  878. {publish, #{payload := <<"hi">>}},
  879. {publish, #{payload := <<"hi">>}}
  880. ],
  881. collect_msgs(5_000),
  882. #{routes => ets:tab2list(emqx_route)}
  883. ),
  884. {ok, _, [?RC_SUCCESS]} = emqtt:unsubscribe(C, SharedTopicGroupA),
  885. {ok, _, [?RC_SUCCESS]} = emqtt:unsubscribe(C, SharedTopicGroupB),
  886. ok.
  887. t_different_groups_update_subopts({init, Config}) ->
  888. TestName = atom_to_binary(?FUNCTION_NAME),
  889. ClientId = <<TestName/binary, (integer_to_binary(erlang:unique_integer()))/binary>>,
  890. {ok, C} = emqtt:start_link([{clientid, ClientId}, {proto_ver, v5}]),
  891. {ok, _} = emqtt:connect(C),
  892. [{client, C}, {clientid, ClientId} | Config];
  893. t_different_groups_update_subopts({'end', Config}) ->
  894. C = ?config(client, Config),
  895. emqtt:stop(C),
  896. ok;
  897. t_different_groups_update_subopts(Config) when is_list(Config) ->
  898. C = ?config(client, Config),
  899. ClientId = ?config(clientid, Config),
  900. %% Subscribe and unsubscribe to different group `aa` and `bb` with same topic
  901. Topic = <<"t/1">>,
  902. GroupA = <<"aa">>,
  903. GroupB = <<"bb">>,
  904. SharedTopicGroupA = format_share(GroupA, Topic),
  905. SharedTopicGroupB = format_share(GroupB, Topic),
  906. Fun = fun(Group, QoS) ->
  907. ?UPDATE_SUB_QOS(C, format_share(Group, Topic), QoS),
  908. ?assertMatch(
  909. #{qos := QoS},
  910. emqx_broker:get_subopts(ClientId, emqx_topic:make_shared_record(Group, Topic))
  911. )
  912. end,
  913. [Fun(Group, QoS) || QoS <- [?QOS_0, ?QOS_1, ?QOS_2], Group <- [GroupA, GroupB]],
  914. ?retry(
  915. _Sleep0 = 100,
  916. _Attempts0 = 50,
  917. begin
  918. ?assertEqual(2, length(emqx_router:match_routes(Topic)))
  919. end
  920. ),
  921. Message0 = emqx_message:make(ClientId, _QoS = 2, Topic, <<"hi">>),
  922. emqx:publish(Message0),
  923. ?assertMatch(
  924. [
  925. {publish, #{payload := <<"hi">>}},
  926. {publish, #{payload := <<"hi">>}}
  927. ],
  928. collect_msgs(5_000),
  929. #{routes => ets:tab2list(emqx_route)}
  930. ),
  931. {ok, _, [?RC_SUCCESS]} = emqtt:unsubscribe(C, SharedTopicGroupA),
  932. {ok, _, [?RC_SUCCESS]} = emqtt:unsubscribe(C, SharedTopicGroupB),
  933. ok.
  934. t_queue_subscription({init, Config}) ->
  935. TestName = atom_to_binary(?FUNCTION_NAME),
  936. ClientId = <<TestName/binary, (integer_to_binary(erlang:unique_integer()))/binary>>,
  937. {ok, C} = emqtt:start_link([{clientid, ClientId}, {proto_ver, v5}]),
  938. {ok, _} = emqtt:connect(C),
  939. [{client, C}, {clientid, ClientId} | Config];
  940. t_queue_subscription({'end', Config}) ->
  941. C = ?config(client, Config),
  942. emqtt:stop(C),
  943. ok;
  944. t_queue_subscription(Config) when is_list(Config) ->
  945. C = ?config(client, Config),
  946. ClientId = ?config(clientid, Config),
  947. %% Subscribe and unsubscribe to both $queue share and $share/<group> with same topic
  948. Topic = <<"t/1">>,
  949. QueueTopic = <<"$queue/", Topic/binary>>,
  950. SharedTopic = <<"$share/aa/", Topic/binary>>,
  951. ?UPDATE_SUB_QOS(C, QueueTopic, ?QOS_2),
  952. ?UPDATE_SUB_QOS(C, SharedTopic, ?QOS_2),
  953. ?retry(
  954. _Sleep0 = 100,
  955. _Attempts0 = 50,
  956. begin
  957. ?assertEqual(2, length(emqx_router:match_routes(Topic)))
  958. end
  959. ),
  960. %% now publish to the underlying topic
  961. Message0 = emqx_message:make(ClientId, _QoS = 2, Topic, <<"hi">>),
  962. emqx:publish(Message0),
  963. ?assertMatch(
  964. [
  965. {publish, #{payload := <<"hi">>}},
  966. {publish, #{payload := <<"hi">>}}
  967. ],
  968. collect_msgs(5_000),
  969. #{routes => ets:tab2list(emqx_route)}
  970. ),
  971. {ok, _, [?RC_SUCCESS]} = emqtt:unsubscribe(C, QueueTopic),
  972. {ok, _, [?RC_SUCCESS]} = emqtt:unsubscribe(C, SharedTopic),
  973. ?retry(
  974. _Sleep0 = 100,
  975. _Attempts0 = 50,
  976. begin
  977. ?assertEqual(0, length(emqx_router:match_routes(Topic)))
  978. end
  979. ),
  980. ct:sleep(500),
  981. Message1 = emqx_message:make(ClientId, _QoS = 2, Topic, <<"hello">>),
  982. emqx:publish(Message1),
  983. %% we should *not* receive any messages.
  984. ?assertEqual([], collect_msgs(1_000), #{routes => ets:tab2list(emqx_route)}),
  985. ok.
  986. %%--------------------------------------------------------------------
  987. %% help functions
  988. %%--------------------------------------------------------------------
  989. format_share(Group, Topic) ->
  990. emqx_topic:maybe_format_share(emqx_topic:make_shared_record(Group, Topic)).
  991. kill_process(Pid) ->
  992. kill_process(Pid, fun(_) -> erlang:exit(Pid, kill) end).
  993. kill_process(Pid, WithFun) ->
  994. _ = unlink(Pid),
  995. _ = monitor(process, Pid),
  996. _ = WithFun(Pid),
  997. receive
  998. {'DOWN', _, process, Pid, _} ->
  999. ok
  1000. after 10_000 ->
  1001. error(timeout)
  1002. end.
  1003. collect_msgs(Timeout) ->
  1004. collect_msgs([], Timeout).
  1005. collect_msgs(Acc, Timeout) ->
  1006. receive
  1007. Msg ->
  1008. collect_msgs([Msg | Acc], Timeout)
  1009. after Timeout ->
  1010. lists:reverse(Acc)
  1011. end.
  1012. ensure_config(Strategy) ->
  1013. ensure_config(Strategy, _AckEnabled = true).
  1014. ensure_config(Strategy, AckEnabled) ->
  1015. emqx_config:put([mqtt, shared_subscription_strategy], Strategy),
  1016. emqx_config:put([broker, shared_dispatch_ack_enabled], AckEnabled),
  1017. ok.
  1018. ensure_node_config(Node, Strategy) ->
  1019. rpc:call(Node, emqx_config, force_put, [[mqtt, shared_subscription_strategy], Strategy]).
  1020. ensure_group_config(Group2Strategy) ->
  1021. lists:foreach(
  1022. fun({Group, Strategy}) ->
  1023. emqx_config:force_put(
  1024. [broker, shared_subscription_group, Group, strategy], Strategy, unsafe
  1025. )
  1026. end,
  1027. maps:to_list(Group2Strategy)
  1028. ).
  1029. ensure_group_config(Node, Group2Strategy) ->
  1030. lists:foreach(
  1031. fun({Group, Strategy}) ->
  1032. rpc:call(
  1033. Node,
  1034. emqx_config,
  1035. force_put,
  1036. [[broker, shared_subscription_group, Group, strategy], Strategy, unsafe]
  1037. )
  1038. end,
  1039. maps:to_list(Group2Strategy)
  1040. ).
  1041. publish_fire_and_forget(Count, Topic) when Count > 1 ->
  1042. lists:foreach(
  1043. fun(I) ->
  1044. Message = erlang:integer_to_binary(I),
  1045. {ok, PublisherPid} = emqtt:start_link(),
  1046. {ok, _} = emqtt:connect(PublisherPid),
  1047. emqtt:publish(PublisherPid, Topic, Message),
  1048. emqtt:stop(PublisherPid),
  1049. ct:sleep(50)
  1050. end,
  1051. lists:seq(0, Count - 1)
  1052. ).
  1053. subscribed(Group, Topic, Pid) ->
  1054. lists:member(Pid, emqx_shared_sub:subscribers(Group, Topic)).
  1055. recv_msgs(Count) ->
  1056. recv_msgs(Count, []).
  1057. recv_msgs(0, Msgs) ->
  1058. Msgs;
  1059. recv_msgs(Count, Msgs) ->
  1060. receive
  1061. {publish, Msg} ->
  1062. recv_msgs(Count - 1, [Msg | Msgs])
  1063. after 100 ->
  1064. Msgs
  1065. end.
  1066. start_peer(Name, Port) ->
  1067. {ok, Node} = emqx_cth_peer:start_link(
  1068. Name,
  1069. ebin_path()
  1070. ),
  1071. pong = net_adm:ping(Node),
  1072. setup_node(Node, Port),
  1073. Node.
  1074. stop_peer(Node) ->
  1075. rpc:call(Node, mria, leave, []),
  1076. emqx_cth_peer:stop(Node).
  1077. host() ->
  1078. [_, Host] = string:tokens(atom_to_list(node()), "@"),
  1079. Host.
  1080. ebin_path() ->
  1081. ["-pa" | code:get_path()].
  1082. setup_node(Node, Port) ->
  1083. EnvHandler =
  1084. fun(_) ->
  1085. %% We load configuration, and than set the special enviroment variable
  1086. %% which says that emqx shouldn't load configuration at startup
  1087. emqx_config:init_load(emqx_schema),
  1088. emqx_app:set_config_loader(?MODULE),
  1089. ok = emqx_config:put([listeners, tcp, default, bind], {{127, 0, 0, 1}, Port}),
  1090. ok = emqx_config:put([listeners, ssl, default, bind], {{127, 0, 0, 1}, Port + 1}),
  1091. ok = emqx_config:put([listeners, ws, default, bind], {{127, 0, 0, 1}, Port + 3}),
  1092. ok = emqx_config:put([listeners, wss, default, bind], {{127, 0, 0, 1}, Port + 4}),
  1093. ok
  1094. end,
  1095. MyPort = Port - 1,
  1096. PeerPort = Port - 2,
  1097. ok = pair_gen_rpc(node(), MyPort, PeerPort),
  1098. ok = pair_gen_rpc(Node, PeerPort, MyPort),
  1099. %% Here we start the node and make it join the cluster
  1100. ok = rpc:call(Node, emqx_common_test_helpers, start_apps, [[], EnvHandler]),
  1101. %% warm it up, also assert the peer ndoe name
  1102. Node = emqx_rpc:call(Node, erlang, node, []),
  1103. rpc:call(Node, mria, join, [node()]),
  1104. ok.
  1105. pair_gen_rpc(Node, LocalPort, RemotePort) ->
  1106. _ = rpc:call(Node, application, load, [gen_rpc]),
  1107. ok = rpc:call(Node, application, set_env, [gen_rpc, port_discovery, manual]),
  1108. ok = rpc:call(Node, application, set_env, [gen_rpc, tcp_server_port, LocalPort]),
  1109. ok = rpc:call(Node, application, set_env, [gen_rpc, tcp_client_port, RemotePort]),
  1110. ok = rpc:call(Node, application, set_env, [gen_rpc, default_client_driver, tcp]),
  1111. _ = rpc:call(Node, application, stop, [gen_rpc]),
  1112. {ok, _} = rpc:call(Node, application, ensure_all_started, [gen_rpc]),
  1113. ok.