emqx_shared_sub_SUITE.erl 44 KB


  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2018-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_shared_sub_SUITE).
  17. -compile(export_all).
  18. -compile(nowarn_export_all).
  19. -include_lib("emqx/include/emqx.hrl").
  20. -include_lib("emqx/include/emqx_mqtt.hrl").
  21. -include_lib("eunit/include/eunit.hrl").
  22. -include_lib("common_test/include/ct.hrl").
  23. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  24. -define(SUITE, ?MODULE).
  25. -define(WAIT(TIMEOUT, PATTERN, Res),
  26. (fun() ->
  27. receive
  28. PATTERN ->
  29. Res;
  30. Other ->
  31. ct:fail(#{
  32. expected => ??PATTERN,
  33. got => Other
  34. })
  35. after TIMEOUT ->
  36. ct:fail({timeout, ??PATTERN})
  37. end
  38. end)()
  39. ).
  40. -define(ack, shared_sub_ack).
  41. -define(no_ack, no_ack).
  42. all() -> emqx_common_test_helpers:all(?SUITE).
  43. init_per_suite(Config) ->
  44. DistPid =
  45. case net_kernel:nodename() of
  46. ignored ->
  47. %% calling `net_kernel:start' without `epmd'
  48. %% running will result in a failure.
  49. emqx_common_test_helpers:start_epmd(),
  50. {ok, Pid} = net_kernel:start(['master@127.0.0.1', longnames]),
  51. ct:pal("start epmd, node name: ~p", [node()]),
  52. Pid;
  53. _ ->
  54. undefined
  55. end,
  56. emqx_common_test_helpers:boot_modules(all),
  57. emqx_common_test_helpers:start_apps([]),
  58. [{dist_pid, DistPid} | Config].
  59. end_per_suite(Config) ->
  60. DistPid = ?config(dist_pid, Config),
  61. case DistPid of
  62. Pid when is_pid(Pid) ->
  63. net_kernel:stop();
  64. _ ->
  65. ok
  66. end,
  67. emqx_common_test_helpers:stop_apps([]).
  68. init_per_testcase(Case, Config) ->
  69. try
  70. ?MODULE:Case({'init', Config})
  71. catch
  72. error:function_clause ->
  73. Config
  74. end.
  75. end_per_testcase(Case, Config) ->
  76. try
  77. ?MODULE:Case({'end', Config})
  78. catch
  79. error:function_clause ->
  80. ok
  81. end.
  82. t_is_ack_required(Config) when is_list(Config) ->
  83. ?assertEqual(false, emqx_shared_sub:is_ack_required(#message{headers = #{}})).
  84. t_maybe_nack_dropped(Config) when is_list(Config) ->
  85. ?assertEqual(false, emqx_shared_sub:maybe_nack_dropped(#message{headers = #{}})),
  86. Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}},
  87. ?assertEqual(true, emqx_shared_sub:maybe_nack_dropped(Msg)),
  88. ?assertEqual(
  89. ok,
  90. receive
  91. {for_test, {shared_sub_nack, dropped}} -> ok
  92. after 100 -> timeout
  93. end
  94. ).
  95. t_nack_no_connection(Config) when is_list(Config) ->
  96. Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}},
  97. ?assertEqual(ok, emqx_shared_sub:nack_no_connection(Msg)),
  98. ?assertEqual(
  99. ok,
  100. receive
  101. {for_test, {shared_sub_nack, no_connection}} -> ok
  102. after 100 -> timeout
  103. end
  104. ).
  105. t_maybe_ack(Config) when is_list(Config) ->
  106. ?assertEqual(#message{headers = #{}}, emqx_shared_sub:maybe_ack(#message{headers = #{}})),
  107. Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}},
  108. ?assertEqual(
  109. #message{headers = #{shared_dispatch_ack => ?no_ack}},
  110. emqx_shared_sub:maybe_ack(Msg)
  111. ),
  112. ?assertEqual(
  113. ok,
  114. receive
  115. {for_test, ?ack} -> ok
  116. after 100 -> timeout
  117. end
  118. ).
  119. t_random_basic(Config) when is_list(Config) ->
  120. ok = ensure_config(random),
  121. ClientId = <<"ClientId">>,
  122. Topic = <<"foo">>,
  123. Payload = <<"hello">>,
  124. emqx:subscribe(Topic, #{qos => 2, share => <<"group1">>}),
  125. MsgQoS2 = emqx_message:make(ClientId, 2, Topic, Payload),
  126. %% wait for the subscription to show up
  127. ct:sleep(200),
  128. ?assertEqual(true, subscribed(<<"group1">>, Topic, self())),
  129. emqx:publish(MsgQoS2),
  130. receive
  131. {deliver, Topic0, #message{
  132. from = ClientId0,
  133. payload = Payload0
  134. }} = M ->
  135. ct:pal("==== received: ~p", [M]),
  136. ?assertEqual(Topic, Topic0),
  137. ?assertEqual(ClientId, ClientId0),
  138. ?assertEqual(Payload, Payload0)
  139. after 1000 -> ct:fail(waiting_basic_failed)
  140. end,
  141. ok.
  142. %% Start two subscribers share subscribe to "$share/g1/foo/bar"
  143. %% Set 'sticky' dispatch strategy, send 1st message to find
  144. %% out which member it picked, then close its connection
  145. %% send the second message, the message should be 'nack'ed
  146. %% by the sticky session and delivered to the 2nd session.
  147. %% After the connection for the 2nd session is also closed,
  148. %% i.e. when all clients are offline, the following message(s)
  149. %% should be delivered randomly.
  150. t_no_connection_nack(Config) when is_list(Config) ->
  151. ok = ensure_config(sticky),
  152. Publisher = <<"publisher">>,
  153. Subscriber1 = <<"Subscriber1">>,
  154. Subscriber2 = <<"Subscriber2">>,
  155. QoS = 1,
  156. Group = <<"g1">>,
  157. Topic = <<"foo/bar">>,
  158. ShareTopic = <<"$share/", Group/binary, $/, Topic/binary>>,
  159. ExpProp = [{properties, #{'Session-Expiry-Interval' => timer:seconds(30)}}],
  160. {ok, SubConnPid1} = emqtt:start_link([{clientid, Subscriber1}] ++ ExpProp),
  161. {ok, _Props1} = emqtt:connect(SubConnPid1),
  162. {ok, SubConnPid2} = emqtt:start_link([{clientid, Subscriber2}] ++ ExpProp),
  163. {ok, _Props2} = emqtt:connect(SubConnPid2),
  164. emqtt:subscribe(SubConnPid1, ShareTopic, QoS),
  165. emqtt:subscribe(SubConnPid1, ShareTopic, QoS),
  166. %% wait for the subscriptions to show up
  167. ct:sleep(200),
  168. MkPayload = fun(PacketId) ->
  169. iolist_to_binary(["hello-", integer_to_list(PacketId)])
  170. end,
  171. SendF = fun(PacketId) ->
  172. M = emqx_message:make(Publisher, QoS, Topic, MkPayload(PacketId)),
  173. emqx:publish(M#message{id = PacketId})
  174. end,
  175. SendF(1),
  176. timer:sleep(200),
  177. %% This is the connection which was picked by broker to dispatch (sticky) for 1st message
  178. ?assertMatch([#{packet_id := 1}], recv_msgs(1)),
  179. ok.
  180. t_random(Config) when is_list(Config) ->
  181. ok = ensure_config(random, true),
  182. test_two_messages(random).
  183. t_round_robin(Config) when is_list(Config) ->
  184. ok = ensure_config(round_robin, true),
  185. test_two_messages(round_robin).
  186. t_round_robin_per_group(Config) when is_list(Config) ->
  187. ok = ensure_config(round_robin_per_group, true),
  188. test_two_messages(round_robin_per_group).
  189. %% this would fail if executed with the standard round_robin strategy
  190. t_round_robin_per_group_even_distribution_one_group(Config) when is_list(Config) ->
  191. ok = ensure_config(round_robin_per_group, true),
  192. Topic = <<"foo/bar">>,
  193. Group = <<"group1">>,
  194. {ok, ConnPid1} = emqtt:start_link([{clientid, <<"C0">>}]),
  195. {ok, ConnPid2} = emqtt:start_link([{clientid, <<"C1">>}]),
  196. {ok, _} = emqtt:connect(ConnPid1),
  197. {ok, _} = emqtt:connect(ConnPid2),
  198. emqtt:subscribe(ConnPid1, {<<"$share/", Group/binary, "/", Topic/binary>>, 0}),
  199. emqtt:subscribe(ConnPid2, {<<"$share/", Group/binary, "/", Topic/binary>>, 0}),
  200. %% publisher with persistent connection
  201. {ok, PublisherPid} = emqtt:start_link(),
  202. {ok, _} = emqtt:connect(PublisherPid),
  203. lists:foreach(
  204. fun(I) ->
  205. Message = erlang:integer_to_binary(I),
  206. emqtt:publish(PublisherPid, Topic, Message)
  207. end,
  208. lists:seq(0, 9)
  209. ),
  210. AllReceivedMessages = lists:map(
  211. fun(#{client_pid := SubscriberPid, payload := Payload}) -> {SubscriberPid, Payload} end,
  212. lists:reverse(recv_msgs(10))
  213. ),
  214. MessagesReceivedSubscriber1 = lists:filter(
  215. fun({P, _Payload}) -> P == ConnPid1 end, AllReceivedMessages
  216. ),
  217. MessagesReceivedSubscriber2 = lists:filter(
  218. fun({P, _Payload}) -> P == ConnPid2 end, AllReceivedMessages
  219. ),
  220. emqtt:stop(ConnPid1),
  221. emqtt:stop(ConnPid2),
  222. emqtt:stop(PublisherPid),
  223. %% ensure each subscriber received 5 messages in alternating fashion
  224. %% one receives all even and the other all uneven payloads
  225. ?assertEqual(
  226. [
  227. {ConnPid1, <<"0">>},
  228. {ConnPid1, <<"2">>},
  229. {ConnPid1, <<"4">>},
  230. {ConnPid1, <<"6">>},
  231. {ConnPid1, <<"8">>}
  232. ],
  233. MessagesReceivedSubscriber1
  234. ),
  235. ?assertEqual(
  236. [
  237. {ConnPid2, <<"1">>},
  238. {ConnPid2, <<"3">>},
  239. {ConnPid2, <<"5">>},
  240. {ConnPid2, <<"7">>},
  241. {ConnPid2, <<"9">>}
  242. ],
  243. MessagesReceivedSubscriber2
  244. ),
  245. ok.
  246. t_round_robin_per_group_even_distribution_two_groups(Config) when is_list(Config) ->
  247. ok = ensure_config(round_robin_per_group, true),
  248. Topic = <<"foo/bar">>,
  249. {ok, ConnPid1} = emqtt:start_link([{clientid, <<"C0">>}]),
  250. {ok, ConnPid2} = emqtt:start_link([{clientid, <<"C1">>}]),
  251. {ok, ConnPid3} = emqtt:start_link([{clientid, <<"C2">>}]),
  252. {ok, ConnPid4} = emqtt:start_link([{clientid, <<"C3">>}]),
  253. ConnPids = [ConnPid1, ConnPid2, ConnPid3, ConnPid4],
  254. lists:foreach(fun(P) -> emqtt:connect(P) end, ConnPids),
  255. %% group1 subscribers
  256. emqtt:subscribe(ConnPid1, {<<"$share/group1/", Topic/binary>>, 0}),
  257. emqtt:subscribe(ConnPid2, {<<"$share/group1/", Topic/binary>>, 0}),
  258. %% group2 subscribers
  259. emqtt:subscribe(ConnPid3, {<<"$share/group2/", Topic/binary>>, 0}),
  260. emqtt:subscribe(ConnPid4, {<<"$share/group2/", Topic/binary>>, 0}),
  261. publish_fire_and_forget(10, Topic),
  262. AllReceivedMessages = lists:map(
  263. fun(#{client_pid := SubscriberPid, payload := Payload}) -> {SubscriberPid, Payload} end,
  264. lists:reverse(recv_msgs(20))
  265. ),
  266. MessagesReceivedSubscriber1 = lists:filter(
  267. fun({P, _Payload}) -> P == ConnPid1 end, AllReceivedMessages
  268. ),
  269. MessagesReceivedSubscriber2 = lists:filter(
  270. fun({P, _Payload}) -> P == ConnPid2 end, AllReceivedMessages
  271. ),
  272. MessagesReceivedSubscriber3 = lists:filter(
  273. fun({P, _Payload}) -> P == ConnPid3 end, AllReceivedMessages
  274. ),
  275. MessagesReceivedSubscriber4 = lists:filter(
  276. fun({P, _Payload}) -> P == ConnPid4 end, AllReceivedMessages
  277. ),
  278. lists:foreach(fun(P) -> emqtt:stop(P) end, ConnPids),
  279. %% ensure each subscriber received 5 messages in alternating fashion in each group
  280. %% subscriber 1 and 3 should receive all even messages
  281. %% subscriber 2 and 4 should receive all uneven messages
  282. ?assertEqual(
  283. [
  284. {ConnPid3, <<"0">>},
  285. {ConnPid3, <<"2">>},
  286. {ConnPid3, <<"4">>},
  287. {ConnPid3, <<"6">>},
  288. {ConnPid3, <<"8">>}
  289. ],
  290. MessagesReceivedSubscriber3
  291. ),
  292. ?assertEqual(
  293. [
  294. {ConnPid2, <<"1">>},
  295. {ConnPid2, <<"3">>},
  296. {ConnPid2, <<"5">>},
  297. {ConnPid2, <<"7">>},
  298. {ConnPid2, <<"9">>}
  299. ],
  300. MessagesReceivedSubscriber2
  301. ),
  302. ?assertEqual(
  303. [
  304. {ConnPid4, <<"1">>},
  305. {ConnPid4, <<"3">>},
  306. {ConnPid4, <<"5">>},
  307. {ConnPid4, <<"7">>},
  308. {ConnPid4, <<"9">>}
  309. ],
  310. MessagesReceivedSubscriber4
  311. ),
  312. ?assertEqual(
  313. [
  314. {ConnPid1, <<"0">>},
  315. {ConnPid1, <<"2">>},
  316. {ConnPid1, <<"4">>},
  317. {ConnPid1, <<"6">>},
  318. {ConnPid1, <<"8">>}
  319. ],
  320. MessagesReceivedSubscriber1
  321. ),
  322. ok.
  323. t_sticky(Config) when is_list(Config) ->
  324. ok = ensure_config(sticky, true),
  325. test_two_messages(sticky).
  326. %% two subscribers in one shared group
  327. %% one unsubscribe after receiving a message
  328. %% the other one in the group should receive the next message
  329. t_sticky_unsubscribe(Config) when is_list(Config) ->
  330. ok = ensure_config(sticky, false),
  331. Topic = <<"foo/bar/sticky-unsub">>,
  332. ClientId1 = <<"c1-sticky-unsub">>,
  333. ClientId2 = <<"c2-sticky-unsub">>,
  334. Group = <<"gsu">>,
  335. {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]),
  336. {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}]),
  337. {ok, _} = emqtt:connect(ConnPid1),
  338. {ok, _} = emqtt:connect(ConnPid2),
  339. ShareTopic = <<"$share/", Group/binary, "/", Topic/binary>>,
  340. emqtt:subscribe(ConnPid1, {ShareTopic, 0}),
  341. emqtt:subscribe(ConnPid2, {ShareTopic, 0}),
  342. Message1 = emqx_message:make(ClientId1, 0, Topic, <<"hello1">>),
  343. Message2 = emqx_message:make(ClientId2, 0, Topic, <<"hello2">>),
  344. ct:sleep(100),
  345. emqx:publish(Message1),
  346. {true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1, ConnPid2]),
  347. emqtt:unsubscribe(UsedSubPid1, ShareTopic),
  348. emqx:publish(Message2),
  349. {true, UsedSubPid2} = last_message(<<"hello2">>, [ConnPid1, ConnPid2]),
  350. ?assertNotEqual(UsedSubPid1, UsedSubPid2),
  351. kill_process(ConnPid1, fun(_) -> emqtt:stop(ConnPid1) end),
  352. kill_process(ConnPid2, fun(_) -> emqtt:stop(ConnPid2) end),
  353. ok.
  354. t_hash(Config) when is_list(Config) ->
  355. ok = ensure_config(hash_clientid, false),
  356. test_two_messages(hash_clientid).
  357. t_hash_clinetid(Config) when is_list(Config) ->
  358. ok = ensure_config(hash_clientid, false),
  359. test_two_messages(hash_clientid).
  360. t_hash_topic(Config) when is_list(Config) ->
  361. ok = ensure_config(hash_topic, false),
  362. ClientId1 = <<"ClientId1">>,
  363. ClientId2 = <<"ClientId2">>,
  364. {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]),
  365. {ok, _} = emqtt:connect(ConnPid1),
  366. {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}]),
  367. {ok, _} = emqtt:connect(ConnPid2),
  368. Topic1 = <<"foo/bar1">>,
  369. Topic2 = <<"foo/bar2">>,
  370. ?assert(erlang:phash2(Topic1) rem 2 =/= erlang:phash2(Topic2) rem 2),
  371. Message1 = emqx_message:make(ClientId1, 0, Topic1, <<"hello1">>),
  372. Message2 = emqx_message:make(ClientId1, 0, Topic2, <<"hello2">>),
  373. emqtt:subscribe(ConnPid1, {<<"$share/group1/foo/#">>, 0}),
  374. emqtt:subscribe(ConnPid2, {<<"$share/group1/foo/#">>, 0}),
  375. ct:sleep(100),
  376. emqx:publish(Message1),
  377. Me = self(),
  378. WaitF = fun(ExpectedPayload) ->
  379. case last_message(ExpectedPayload, [ConnPid1, ConnPid2]) of
  380. {true, Pid} ->
  381. Me ! {subscriber, Pid},
  382. true;
  383. Other ->
  384. Other
  385. end
  386. end,
  387. WaitF(<<"hello1">>),
  388. UsedSubPid1 =
  389. receive
  390. {subscriber, P1} -> P1
  391. end,
  392. emqx_broker:publish(Message2),
  393. WaitF(<<"hello2">>),
  394. UsedSubPid2 =
  395. receive
  396. {subscriber, P2} -> P2
  397. end,
  398. ?assert(UsedSubPid1 =/= UsedSubPid2),
  399. emqtt:stop(ConnPid1),
  400. emqtt:stop(ConnPid2),
  401. ok.
  402. %% if the original subscriber dies, change to another one alive
  403. t_not_so_sticky(Config) when is_list(Config) ->
  404. ok = ensure_config(sticky),
  405. ClientId1 = <<"ClientId1">>,
  406. ClientId2 = <<"ClientId2">>,
  407. {ok, C1} = emqtt:start_link([{clientid, ClientId1}]),
  408. {ok, _} = emqtt:connect(C1),
  409. {ok, C2} = emqtt:start_link([{clientid, ClientId2}]),
  410. {ok, _} = emqtt:connect(C2),
  411. emqtt:subscribe(C1, {<<"$share/group1/foo/bar">>, 0}),
  412. timer:sleep(50),
  413. emqtt:publish(C2, <<"foo/bar">>, <<"hello1">>),
  414. ?assertMatch([#{payload := <<"hello1">>}], recv_msgs(1)),
  415. emqtt:unsubscribe(C1, <<"$share/group1/foo/bar">>),
  416. timer:sleep(50),
  417. emqtt:subscribe(C1, {<<"$share/group1/foo/#">>, 0}),
  418. timer:sleep(50),
  419. emqtt:publish(C2, <<"foo/bar">>, <<"hello2">>),
  420. ?assertMatch([#{payload := <<"hello2">>}], recv_msgs(1)),
  421. emqtt:disconnect(C1),
  422. emqtt:disconnect(C2),
  423. ok.
  424. test_two_messages(Strategy) ->
  425. test_two_messages(Strategy, <<"group1">>).
  426. test_two_messages(Strategy, Group) ->
  427. Topic = <<"foo/bar">>,
  428. ClientId1 = <<"ClientId1">>,
  429. ClientId2 = <<"ClientId2">>,
  430. {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]),
  431. {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}]),
  432. {ok, _} = emqtt:connect(ConnPid1),
  433. {ok, _} = emqtt:connect(ConnPid2),
  434. emqtt:subscribe(ConnPid1, {<<"$share/", Group/binary, "/", Topic/binary>>, 0}),
  435. emqtt:subscribe(ConnPid2, {<<"$share/", Group/binary, "/", Topic/binary>>, 0}),
  436. Message1 = emqx_message:make(ClientId1, 0, Topic, <<"hello1">>),
  437. Message2 = emqx_message:make(ClientId2, 0, Topic, <<"hello2">>),
  438. ct:sleep(100),
  439. emqx:publish(Message1),
  440. {true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1, ConnPid2]),
  441. emqx:publish(Message2),
  442. {true, UsedSubPid2} = last_message(<<"hello2">>, [ConnPid1, ConnPid2]),
  443. emqtt:stop(ConnPid1),
  444. emqtt:stop(ConnPid2),
  445. case Strategy of
  446. sticky -> ?assertEqual(UsedSubPid1, UsedSubPid2);
  447. round_robin -> ?assertNotEqual(UsedSubPid1, UsedSubPid2);
  448. round_robin_per_group -> ?assertNotEqual(UsedSubPid1, UsedSubPid2);
  449. hash_clientid -> ?assertEqual(UsedSubPid1, UsedSubPid2);
  450. _ -> ok
  451. end,
  452. ok.
  453. last_message(ExpectedPayload, Pids) ->
  454. last_message(ExpectedPayload, Pids, 100).
  455. last_message(ExpectedPayload, Pids, Timeout) ->
  456. receive
  457. {publish, #{client_pid := Pid, payload := ExpectedPayload}} ->
  458. ?assert(lists:member(Pid, Pids)),
  459. {true, Pid}
  460. after Timeout ->
  461. ct:pal("not yet"),
  462. <<"not yet?">>
  463. end.
  464. t_dispatch(Config) when is_list(Config) ->
  465. ok = ensure_config(random),
  466. Topic = <<"foo">>,
  467. ?assertEqual(
  468. {error, no_subscribers},
  469. emqx_shared_sub:dispatch(<<"group1">>, Topic, #delivery{message = #message{}})
  470. ),
  471. emqx:subscribe(Topic, #{qos => 2, share => <<"group1">>}),
  472. ?assertEqual(
  473. {ok, 1},
  474. emqx_shared_sub:dispatch(<<"group1">>, Topic, #delivery{message = #message{}})
  475. ).
  476. t_uncovered_func(Config) when is_list(Config) ->
  477. ignored = gen_server:call(emqx_shared_sub, ignored),
  478. ok = gen_server:cast(emqx_shared_sub, ignored),
  479. ignored = emqx_shared_sub ! ignored,
  480. {mnesia_table_event, []} = emqx_shared_sub ! {mnesia_table_event, []}.
  481. t_per_group_config(Config) when is_list(Config) ->
  482. ok = ensure_group_config(#{
  483. <<"local_group">> => local,
  484. <<"round_robin_group">> => round_robin,
  485. <<"sticky_group">> => sticky,
  486. <<"round_robin_per_group_group">> => round_robin_per_group
  487. }),
  488. %% Each test is repeated 4 times because random strategy may technically pass the test
  489. %% so we run 8 tests to make random pass in only 1/256 runs
  490. test_two_messages(sticky, <<"sticky_group">>),
  491. test_two_messages(sticky, <<"sticky_group">>),
  492. test_two_messages(round_robin, <<"round_robin_group">>),
  493. test_two_messages(round_robin, <<"round_robin_group">>),
  494. test_two_messages(sticky, <<"sticky_group">>),
  495. test_two_messages(sticky, <<"sticky_group">>),
  496. test_two_messages(round_robin, <<"round_robin_group">>),
  497. test_two_messages(round_robin, <<"round_robin_group">>),
  498. test_two_messages(round_robin_per_group, <<"round_robin_per_group_group">>),
  499. test_two_messages(round_robin_per_group, <<"round_robin_per_group_group">>).
  500. t_local(Config) when is_list(Config) ->
  501. GroupConfig = #{
  502. <<"local_group">> => local,
  503. <<"round_robin_group">> => round_robin,
  504. <<"sticky_group">> => sticky
  505. },
  506. Node = start_slave('local_shared_sub_testtesttest', 21999),
  507. ok = ensure_group_config(GroupConfig),
  508. ok = ensure_group_config(Node, GroupConfig),
  509. Topic = <<"local_foo/bar">>,
  510. ClientId1 = <<"ClientId1">>,
  511. ClientId2 = <<"ClientId2">>,
  512. {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]),
  513. {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {port, 21999}]),
  514. {ok, _} = emqtt:connect(ConnPid1),
  515. {ok, _} = emqtt:connect(ConnPid2),
  516. emqtt:subscribe(ConnPid1, {<<"$share/local_group/", Topic/binary>>, 0}),
  517. emqtt:subscribe(ConnPid2, {<<"$share/local_group/", Topic/binary>>, 0}),
  518. ct:sleep(100),
  519. Message1 = emqx_message:make(ClientId1, 0, Topic, <<"hello1">>),
  520. Message2 = emqx_message:make(ClientId2, 0, Topic, <<"hello2">>),
  521. emqx:publish(Message1),
  522. {true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1, ConnPid2]),
  523. rpc:call(Node, emqx, publish, [Message2]),
  524. {true, UsedSubPid2} = last_message(<<"hello2">>, [ConnPid1, ConnPid2]),
  525. RemoteLocalGroupStrategy = rpc:call(Node, emqx_shared_sub, strategy, [<<"local_group">>]),
  526. emqtt:stop(ConnPid1),
  527. emqtt:stop(ConnPid2),
  528. stop_slave(Node),
  529. ?assertEqual(local, emqx_shared_sub:strategy(<<"local_group">>)),
  530. ?assertEqual(local, RemoteLocalGroupStrategy),
  531. ?assertNotEqual(UsedSubPid1, UsedSubPid2),
  532. ok.
  533. t_remote(Config) when is_list(Config) ->
  534. %% This testcase verifies dispatching of shared messages to the remote nodes via backplane API.
  535. %%
  536. %% In this testcase we start two EMQX nodes: local and remote.
  537. %% A subscriber connects to the remote node.
  538. %% A publisher connects to the local node and sends three messages with different QoS.
  539. %% The test verifies that the remote side received all three messages.
  540. ok = ensure_config(sticky, true),
  541. GroupConfig = #{
  542. <<"local_group">> => local,
  543. <<"round_robin_group">> => round_robin,
  544. <<"sticky_group">> => sticky
  545. },
  546. Node = start_slave('remote_shared_sub_testtesttest', 21999),
  547. ok = ensure_group_config(GroupConfig),
  548. ok = ensure_group_config(Node, GroupConfig),
  549. Topic = <<"foo/bar">>,
  550. ClientIdLocal = <<"ClientId1">>,
  551. ClientIdRemote = <<"ClientId2">>,
  552. {ok, ConnPidLocal} = emqtt:start_link([{clientid, ClientIdLocal}]),
  553. {ok, ConnPidRemote} = emqtt:start_link([{clientid, ClientIdRemote}, {port, 21999}]),
  554. try
  555. {ok, ClientPidLocal} = emqtt:connect(ConnPidLocal),
  556. {ok, _ClientPidRemote} = emqtt:connect(ConnPidRemote),
  557. emqtt:subscribe(ConnPidRemote, {<<"$share/remote_group/", Topic/binary>>, 0}),
  558. ct:sleep(100),
  559. Message1 = emqx_message:make(ClientPidLocal, 0, Topic, <<"hello1">>),
  560. Message2 = emqx_message:make(ClientPidLocal, 1, Topic, <<"hello2">>),
  561. Message3 = emqx_message:make(ClientPidLocal, 2, Topic, <<"hello3">>),
  562. emqx:publish(Message1),
  563. {true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPidRemote]),
  564. emqx:publish(Message2),
  565. {true, UsedSubPid1} = last_message(<<"hello2">>, [ConnPidRemote]),
  566. emqx:publish(Message3),
  567. {true, UsedSubPid1} = last_message(<<"hello3">>, [ConnPidRemote]),
  568. ok
  569. after
  570. emqtt:stop(ConnPidLocal),
  571. emqtt:stop(ConnPidRemote),
  572. stop_slave(Node)
  573. end.
  574. t_local_fallback(Config) when is_list(Config) ->
  575. ok = ensure_group_config(#{
  576. <<"local_group">> => local,
  577. <<"round_robin_group">> => round_robin,
  578. <<"sticky_group">> => sticky
  579. }),
  580. Topic = <<"local_foo/bar">>,
  581. ClientId1 = <<"ClientId1">>,
  582. ClientId2 = <<"ClientId2">>,
  583. Node = start_slave('local_fallback_shared_sub_test', 11888),
  584. {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]),
  585. {ok, _} = emqtt:connect(ConnPid1),
  586. Message1 = emqx_message:make(ClientId1, 0, Topic, <<"hello1">>),
  587. Message2 = emqx_message:make(ClientId2, 0, Topic, <<"hello2">>),
  588. emqtt:subscribe(ConnPid1, {<<"$share/local_group/", Topic/binary>>, 0}),
  589. emqx:publish(Message1),
  590. {true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1]),
  591. rpc:call(Node, emqx, publish, [Message2]),
  592. {true, UsedSubPid2} = last_message(<<"hello2">>, [ConnPid1], 2_000),
  593. emqtt:stop(ConnPid1),
  594. stop_slave(Node),
  595. ?assertEqual(UsedSubPid1, UsedSubPid2),
  596. ok.
  597. %% This one tests that broker tries to select another shared subscriber
  598. %% If the first one doesn't return an ACK
  599. t_redispatch_qos1_with_ack(Config) when is_list(Config) ->
  600. test_redispatch_qos1(Config, true).
  601. t_redispatch_qos1_no_ack(Config) when is_list(Config) ->
  602. test_redispatch_qos1(Config, false).
  603. test_redispatch_qos1(_Config, AckEnabled) ->
  604. ok = ensure_config(sticky, AckEnabled),
  605. Group = <<"group1">>,
  606. Topic = <<"foo/bar">>,
  607. ClientId1 = <<"ClientId1">>,
  608. ClientId2 = <<"ClientId2">>,
  609. {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {auto_ack, false}]),
  610. {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {auto_ack, false}]),
  611. {ok, _} = emqtt:connect(ConnPid1),
  612. {ok, _} = emqtt:connect(ConnPid2),
  613. emqtt:subscribe(ConnPid1, {<<"$share/", Group/binary, "/foo/bar">>, 1}),
  614. emqtt:subscribe(ConnPid2, {<<"$share/", Group/binary, "/foo/bar">>, 1}),
  615. Message = emqx_message:make(ClientId1, 1, Topic, <<"hello1">>),
  616. emqx:publish(Message),
  617. {true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1, ConnPid2]),
  618. ok = emqtt:stop(UsedSubPid1),
  619. Res = last_message(<<"hello1">>, [ConnPid1, ConnPid2], 6000),
  620. ?assertMatch({true, Pid} when Pid =/= UsedSubPid1, Res),
  621. {true, UsedSubPid2} = Res,
  622. emqtt:stop(UsedSubPid2),
  623. ok.
  624. t_qos1_random_dispatch_if_all_members_are_down(Config) when is_list(Config) ->
  625. ok = ensure_config(sticky, true),
  626. Group = <<"group1">>,
  627. Topic = <<"foo/bar">>,
  628. ClientId1 = <<"ClientId1">>,
  629. ClientId2 = <<"ClientId2">>,
  630. SubOpts = [{clean_start, false}],
  631. {ok, ConnPub} = emqtt:start_link([{clientid, <<"pub">>}]),
  632. {ok, _} = emqtt:connect(ConnPub),
  633. {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1} | SubOpts]),
  634. {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2} | SubOpts]),
  635. {ok, _} = emqtt:connect(ConnPid1),
  636. {ok, _} = emqtt:connect(ConnPid2),
  637. emqtt:subscribe(ConnPid1, {<<"$share/", Group/binary, "/foo/bar">>, 1}),
  638. emqtt:subscribe(ConnPid2, {<<"$share/", Group/binary, "/foo/bar">>, 1}),
  639. ok = emqtt:stop(ConnPid1),
  640. ok = emqtt:stop(ConnPid2),
  641. [Pid1, Pid2] = emqx_shared_sub:subscribers(Group, Topic),
  642. ?assert(is_process_alive(Pid1)),
  643. ?assert(is_process_alive(Pid2)),
  644. {ok, _} = emqtt:publish(ConnPub, Topic, <<"hello11">>, 1),
  645. ct:sleep(100),
  646. Msgs1 = emqx_mqueue:to_list(get_mqueue(Pid1)),
  647. Msgs2 = emqx_mqueue:to_list(get_mqueue(Pid2)),
  648. %% assert the message is in mqueue (because socket is closed)
  649. ?assertMatch([#message{payload = <<"hello11">>}], Msgs1 ++ Msgs2),
  650. emqtt:stop(ConnPub),
  651. ok.
  652. get_mqueue(ConnPid) ->
  653. emqx_connection:info({channel, {session, mqueue}}, sys:get_state(ConnPid)).
  654. %% No ack, QoS 2 subscriptions,
  655. %% client1 receives one message, send pubrec, then suspend
  656. %% client2 acts normal (auto_ack=true)
  657. %% Expected behaviour:
  658. %% the messages sent to client1's inflight and mq are re-dispatched after client1 is down
  659. t_dispatch_qos2({init, Config}) when is_list(Config) ->
  660. ok = ensure_config(round_robin, _AckEnabled = false),
  661. emqx_config:put_zone_conf(default, [mqtt, max_inflight], 1),
  662. Config;
  663. t_dispatch_qos2({'end', Config}) when is_list(Config) ->
  664. emqx_config:put_zone_conf(default, [mqtt, max_inflight], 0);
  665. t_dispatch_qos2(Config) when is_list(Config) ->
  666. Topic = <<"foo/bar/1">>,
  667. ClientId1 = <<"ClientId1">>,
  668. ClientId2 = <<"ClientId2">>,
  669. {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {auto_ack, false}]),
  670. {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {auto_ack, true}]),
  671. {ok, _} = emqtt:connect(ConnPid1),
  672. {ok, _} = emqtt:connect(ConnPid2),
  673. emqtt:subscribe(ConnPid1, {<<"$share/group/foo/bar/#">>, 2}),
  674. emqtt:subscribe(ConnPid2, {<<"$share/group/foo/bar/#">>, 2}),
  675. Message1 = emqx_message:make(ClientId1, 2, Topic, <<"hello1">>),
  676. Message2 = emqx_message:make(ClientId1, 2, Topic, <<"hello2">>),
  677. Message3 = emqx_message:make(ClientId1, 2, Topic, <<"hello3">>),
  678. Message4 = emqx_message:make(ClientId1, 2, Topic, <<"hello4">>),
  679. ct:sleep(100),
  680. ok = sys:suspend(ConnPid1),
  681. %% One message is inflight
  682. ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message1)),
  683. ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message2)),
  684. ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message3)),
  685. ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message4)),
  686. %% assert client 2 receives two messages, they are eiter 1,3 or 2,4 depending
  687. %% on if it's picked as the first one for round_robin
  688. MsgRec1 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P1}}, P1),
  689. MsgRec2 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P2}}, P2),
  690. case MsgRec2 of
  691. <<"hello3">> ->
  692. ?assertEqual(<<"hello1">>, MsgRec1);
  693. <<"hello4">> ->
  694. ?assertEqual(<<"hello2">>, MsgRec1)
  695. end,
  696. sys:resume(ConnPid1),
  697. %% emqtt subscriber automatically sends PUBREC, but since auto_ack is set to false
  698. %% so it will never send PUBCOMP, hence EMQX should not attempt to send
  699. %% the 4th message yet since max_inflight is 1.
  700. MsgRec3 = ?WAIT(2000, {publish, #{client_pid := ConnPid1, payload := P3}}, P3),
  701. ct:sleep(100),
  702. %% no message expected
  703. ?assertEqual([], collect_msgs(0)),
  704. %% now kill client 1
  705. kill_process(ConnPid1),
  706. %% client 2 should receive the message
  707. MsgRec4 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P4}}, P4),
  708. case MsgRec2 of
  709. <<"hello3">> ->
  710. ?assertEqual(<<"hello2">>, MsgRec3),
  711. ?assertEqual(<<"hello4">>, MsgRec4);
  712. <<"hello4">> ->
  713. ?assertEqual(<<"hello1">>, MsgRec3),
  714. ?assertEqual(<<"hello3">>, MsgRec4)
  715. end,
  716. emqtt:stop(ConnPid2),
  717. ok.
  718. t_dispatch_qos0({init, Config}) when is_list(Config) ->
  719. Config;
  720. t_dispatch_qos0({'end', Config}) when is_list(Config) ->
  721. ok;
  722. t_dispatch_qos0(Config) when is_list(Config) ->
  723. ok = ensure_config(round_robin, _AckEnabled = false),
  724. Topic = <<"foo/bar/1">>,
  725. ClientId1 = <<"ClientId1">>,
  726. ClientId2 = <<"ClientId2">>,
  727. {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {auto_ack, false}]),
  728. {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {auto_ack, true}]),
  729. {ok, _} = emqtt:connect(ConnPid1),
  730. {ok, _} = emqtt:connect(ConnPid2),
  731. %% subscribe with QoS 0
  732. emqtt:subscribe(ConnPid1, {<<"$share/group/foo/bar/#">>, 0}),
  733. emqtt:subscribe(ConnPid2, {<<"$share/group/foo/bar/#">>, 0}),
  734. %% publish with QoS 2, but should be downgraded to 0 as the subscribers
  735. %% subscribe with QoS 0
  736. Message1 = emqx_message:make(ClientId1, 2, Topic, <<"hello1">>),
  737. Message2 = emqx_message:make(ClientId1, 2, Topic, <<"hello2">>),
  738. Message3 = emqx_message:make(ClientId1, 2, Topic, <<"hello3">>),
  739. Message4 = emqx_message:make(ClientId1, 2, Topic, <<"hello4">>),
  740. ct:sleep(100),
  741. ok = sys:suspend(ConnPid1),
  742. ?assertMatch([_], emqx:publish(Message1)),
  743. ?assertMatch([_], emqx:publish(Message2)),
  744. ?assertMatch([_], emqx:publish(Message3)),
  745. ?assertMatch([_], emqx:publish(Message4)),
  746. MsgRec1 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P1}}, P1),
  747. MsgRec2 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P2}}, P2),
  748. %% assert hello2 > hello1 or hello4 > hello3
  749. ?assert(MsgRec2 > MsgRec1),
  750. kill_process(ConnPid1),
  751. %% expect no redispatch
  752. ?assertEqual([], collect_msgs(timer:seconds(2))),
  753. emqtt:stop(ConnPid2),
  754. ok.
  755. t_session_takeover({init, Config}) when is_list(Config) ->
  756. Config;
  757. t_session_takeover({'end', Config}) when is_list(Config) ->
  758. ok;
  759. t_session_takeover(Config) when is_list(Config) ->
  760. Topic = <<"t1/a">>,
  761. ClientId = iolist_to_binary("c" ++ integer_to_list(erlang:system_time())),
  762. Opts = [
  763. {clientid, ClientId},
  764. {auto_ack, true},
  765. {proto_ver, v5},
  766. {clean_start, false},
  767. {properties, #{'Session-Expiry-Interval' => 60}}
  768. ],
  769. {ok, ConnPid1} = emqtt:start_link(Opts),
  770. %% with the same client ID, start another client
  771. {ok, ConnPid2} = emqtt:start_link(Opts),
  772. {ok, _} = emqtt:connect(ConnPid1),
  773. emqtt:subscribe(ConnPid1, {<<"$share/t1/", Topic/binary>>, _QoS = 1}),
  774. Message1 = emqx_message:make(<<"dummypub">>, 2, Topic, <<"hello1">>),
  775. Message2 = emqx_message:make(<<"dummypub">>, 2, Topic, <<"hello2">>),
  776. Message3 = emqx_message:make(<<"dummypub">>, 2, Topic, <<"hello3">>),
  777. Message4 = emqx_message:make(<<"dummypub">>, 2, Topic, <<"hello4">>),
  778. %% Make sure client1 is functioning
  779. ?assertMatch([_], emqx:publish(Message1)),
  780. {true, _} = last_message(<<"hello1">>, [ConnPid1]),
  781. %% Kill client1
  782. emqtt:stop(ConnPid1),
  783. %% publish another message (should end up in client1's session)
  784. ?assertMatch([_], emqx:publish(Message2)),
  785. %% connect client2 (with the same clientid)
  786. %% should trigger session take over
  787. {ok, _} = emqtt:connect(ConnPid2),
  788. ?assertMatch([_], emqx:publish(Message3)),
  789. ?assertMatch([_], emqx:publish(Message4)),
  790. {true, _} = last_message(<<"hello2">>, [ConnPid2]),
  791. {true, _} = last_message(<<"hello3">>, [ConnPid2]),
  792. {true, _} = last_message(<<"hello4">>, [ConnPid2]),
  793. ?assertEqual([], collect_msgs(timer:seconds(2))),
  794. emqtt:stop(ConnPid2),
  795. ok.
  796. t_session_kicked({init, Config}) when is_list(Config) ->
  797. ok = ensure_config(round_robin, _AckEnabled = false),
  798. emqx_config:put_zone_conf(default, [mqtt, max_inflight], 1),
  799. Config;
  800. t_session_kicked({'end', Config}) when is_list(Config) ->
  801. emqx_config:put_zone_conf(default, [mqtt, max_inflight], 0);
  802. t_session_kicked(Config) when is_list(Config) ->
  803. Topic = <<"foo/bar/1">>,
  804. ClientId1 = <<"ClientId1">>,
  805. ClientId2 = <<"ClientId2">>,
  806. {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {auto_ack, false}]),
  807. {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {auto_ack, true}]),
  808. {ok, _} = emqtt:connect(ConnPid1),
  809. {ok, _} = emqtt:connect(ConnPid2),
  810. emqtt:subscribe(ConnPid1, {<<"$share/group/foo/bar/#">>, 2}),
  811. emqtt:subscribe(ConnPid2, {<<"$share/group/foo/bar/#">>, 2}),
  812. Message1 = emqx_message:make(ClientId1, 2, Topic, <<"hello1">>),
  813. Message2 = emqx_message:make(ClientId1, 2, Topic, <<"hello2">>),
  814. Message3 = emqx_message:make(ClientId1, 2, Topic, <<"hello3">>),
  815. Message4 = emqx_message:make(ClientId1, 2, Topic, <<"hello4">>),
  816. ct:sleep(100),
  817. ok = sys:suspend(ConnPid1),
  818. %% One message is inflight
  819. ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message1)),
  820. ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message2)),
  821. ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message3)),
  822. ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message4)),
  823. %% assert client 2 receives two messages, they are eiter 1,3 or 2,4 depending
  824. %% on if it's picked as the first one for round_robin
  825. MsgRec1 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P1}}, P1),
  826. MsgRec2 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P2}}, P2),
  827. case MsgRec2 of
  828. <<"hello3">> ->
  829. ?assertEqual(<<"hello1">>, MsgRec1);
  830. <<"hello4">> ->
  831. ?assertEqual(<<"hello2">>, MsgRec1)
  832. end,
  833. sys:resume(ConnPid1),
  834. %% emqtt subscriber automatically sends PUBREC, but since auto_ack is set to false
  835. %% so it will never send PUBCOMP, hence EMQX should not attempt to send
  836. %% the 4th message yet since max_inflight is 1.
  837. MsgRec3 = ?WAIT(2000, {publish, #{client_pid := ConnPid1, payload := P3}}, P3),
  838. case MsgRec2 of
  839. <<"hello3">> ->
  840. ?assertEqual(<<"hello2">>, MsgRec3);
  841. <<"hello4">> ->
  842. ?assertEqual(<<"hello1">>, MsgRec3)
  843. end,
  844. %% no message expected
  845. ?assertEqual([], collect_msgs(0)),
  846. %% now kick client 1
  847. kill_process(ConnPid1, fun(_Pid) -> emqx_cm:kick_session(ClientId1) end),
  848. %% client 2 should NOT receive the message
  849. ?assertEqual([], collect_msgs(1000)),
  850. emqtt:stop(ConnPid2),
  851. ?assertEqual([], collect_msgs(0)),
  852. ok.
  853. %% FIXME: currently doesn't work
  854. %% t_different_groups_same_topic({init, Config}) ->
  855. %% TestName = atom_to_binary(?FUNCTION_NAME),
  856. %% ClientId = <<TestName/binary, (integer_to_binary(erlang:unique_integer()))/binary>>,
  857. %% {ok, C} = emqtt:start_link([{clientid, ClientId}, {proto_ver, v5}]),
  858. %% {ok, _} = emqtt:connect(C),
  859. %% [{client, C}, {clientid, ClientId} | Config];
  860. %% t_different_groups_same_topic({'end', Config}) ->
  861. %% C = ?config(client, Config),
  862. %% emqtt:stop(C),
  863. %% ok;
  864. %% t_different_groups_same_topic(Config) when is_list(Config) ->
  865. %% C = ?config(client, Config),
  866. %% ClientId = ?config(clientid, Config),
  867. %% %% Subscribe and unsubscribe to both $queue and $shared topics
  868. %% Topic = <<"t/1">>,
  869. %% SharedTopic0 = <<"$share/aa/", Topic/binary>>,
  870. %% SharedTopic1 = <<"$share/bb/", Topic/binary>>,
  871. %% {ok, _, [2]} = emqtt:subscribe(C, {SharedTopic0, 2}),
  872. %% {ok, _, [2]} = emqtt:subscribe(C, {SharedTopic1, 2}),
  873. %% Message0 = emqx_message:make(ClientId, _QoS = 2, Topic, <<"hi">>),
  874. %% emqx:publish(Message0),
  875. %% ?assertMatch([ {publish, #{payload := <<"hi">>}}
  876. %% , {publish, #{payload := <<"hi">>}}
  877. %% ], collect_msgs(5_000), #{routes => ets:tab2list(emqx_route)}),
  878. %% {ok, _, [0]} = emqtt:unsubscribe(C, SharedTopic0),
  879. %% {ok, _, [0]} = emqtt:unsubscribe(C, SharedTopic1),
  880. %% ok.
  881. t_queue_subscription({init, Config}) ->
  882. TestName = atom_to_binary(?FUNCTION_NAME),
  883. ClientId = <<TestName/binary, (integer_to_binary(erlang:unique_integer()))/binary>>,
  884. {ok, C} = emqtt:start_link([{clientid, ClientId}, {proto_ver, v5}]),
  885. {ok, _} = emqtt:connect(C),
  886. [{client, C}, {clientid, ClientId} | Config];
  887. t_queue_subscription({'end', Config}) ->
  888. C = ?config(client, Config),
  889. emqtt:stop(C),
  890. ok;
  891. t_queue_subscription(Config) when is_list(Config) ->
  892. C = ?config(client, Config),
  893. ClientId = ?config(clientid, Config),
  894. %% Subscribe and unsubscribe to both $queue and $shared topics
  895. Topic = <<"t/1">>,
  896. QueueTopic = <<"$queue/", Topic/binary>>,
  897. SharedTopic = <<"$share/aa/", Topic/binary>>,
  898. {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(C, {QueueTopic, 2}),
  899. {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(C, {SharedTopic, 2}),
  900. %% FIXME: we should actually see 2 routes, one for each group
  901. %% ($queue and aa), but currently the latest subscription
  902. %% overwrites the existing one.
  903. ?retry(
  904. _Sleep0 = 100,
  905. _Attempts0 = 50,
  906. begin
  907. ct:pal("routes: ~p", [ets:tab2list(emqx_route)]),
  908. %% FIXME: should ensure we have 2 subscriptions
  909. true = emqx_router:has_routes(Topic)
  910. end
  911. ),
  912. %% now publish to the underlying topic
  913. Message0 = emqx_message:make(ClientId, _QoS = 2, Topic, <<"hi">>),
  914. emqx:publish(Message0),
  915. ?assertMatch(
  916. [
  917. {publish, #{payload := <<"hi">>}}
  918. %% FIXME: should receive one message from each group
  919. %% , {publish, #{payload := <<"hi">>}}
  920. ],
  921. collect_msgs(5_000)
  922. ),
  923. {ok, _, [?RC_SUCCESS]} = emqtt:unsubscribe(C, QueueTopic),
  924. %% FIXME: return code should be success instead of 17 ("no_subscription_existed")
  925. {ok, _, [?RC_NO_SUBSCRIPTION_EXISTED]} = emqtt:unsubscribe(C, SharedTopic),
  926. %% FIXME: this should eventually be true, but currently we leak
  927. %% the previous group subscription...
  928. %% ?retry(
  929. %% _Sleep0 = 100,
  930. %% _Attempts0 = 50,
  931. %% begin
  932. %% ct:pal("routes: ~p", [ets:tab2list(emqx_route)]),
  933. %% false = emqx_router:has_routes(Topic)
  934. %% end
  935. %% ),
  936. ct:sleep(500),
  937. Message1 = emqx_message:make(ClientId, _QoS = 2, Topic, <<"hello">>),
  938. emqx:publish(Message1),
  939. %% FIXME: we should *not* receive any messages...
  940. %% ?assertEqual([], collect_msgs(1_000), #{routes => ets:tab2list(emqx_route)}),
  941. %% This is from the leaked group...
  942. ?assertMatch([{publish, #{topic := Topic}}], collect_msgs(1_000), #{
  943. routes => ets:tab2list(emqx_route)
  944. }),
  945. ok.
  946. %%--------------------------------------------------------------------
  947. %% help functions
  948. %%--------------------------------------------------------------------
  949. kill_process(Pid) ->
  950. kill_process(Pid, fun(_) -> erlang:exit(Pid, kill) end).
  951. kill_process(Pid, WithFun) ->
  952. _ = unlink(Pid),
  953. _ = monitor(process, Pid),
  954. _ = WithFun(Pid),
  955. receive
  956. {'DOWN', _, process, Pid, _} ->
  957. ok
  958. after 10_000 ->
  959. error(timeout)
  960. end.
  961. collect_msgs(Timeout) ->
  962. collect_msgs([], Timeout).
  963. collect_msgs(Acc, Timeout) ->
  964. receive
  965. Msg ->
  966. collect_msgs([Msg | Acc], Timeout)
  967. after Timeout ->
  968. lists:reverse(Acc)
  969. end.
  970. ensure_config(Strategy) ->
  971. ensure_config(Strategy, _AckEnabled = true).
  972. ensure_config(Strategy, AckEnabled) ->
  973. emqx_config:put([mqtt, shared_subscription_strategy], Strategy),
  974. emqx_config:put([broker, shared_dispatch_ack_enabled], AckEnabled),
  975. ok.
  976. ensure_node_config(Node, Strategy) ->
  977. rpc:call(Node, emqx_config, force_put, [[mqtt, shared_subscription_strategy], Strategy]).
  978. ensure_group_config(Group2Strategy) ->
  979. lists:foreach(
  980. fun({Group, Strategy}) ->
  981. emqx_config:force_put(
  982. [broker, shared_subscription_group, Group, strategy], Strategy, unsafe
  983. )
  984. end,
  985. maps:to_list(Group2Strategy)
  986. ).
  987. ensure_group_config(Node, Group2Strategy) ->
  988. lists:foreach(
  989. fun({Group, Strategy}) ->
  990. rpc:call(
  991. Node,
  992. emqx_config,
  993. force_put,
  994. [[broker, shared_subscription_group, Group, strategy], Strategy, unsafe]
  995. )
  996. end,
  997. maps:to_list(Group2Strategy)
  998. ).
  999. publish_fire_and_forget(Count, Topic) when Count > 1 ->
  1000. lists:foreach(
  1001. fun(I) ->
  1002. Message = erlang:integer_to_binary(I),
  1003. {ok, PublisherPid} = emqtt:start_link(),
  1004. {ok, _} = emqtt:connect(PublisherPid),
  1005. emqtt:publish(PublisherPid, Topic, Message),
  1006. emqtt:stop(PublisherPid),
  1007. ct:sleep(50)
  1008. end,
  1009. lists:seq(0, Count - 1)
  1010. ).
  1011. subscribed(Group, Topic, Pid) ->
  1012. lists:member(Pid, emqx_shared_sub:subscribers(Group, Topic)).
  1013. recv_msgs(Count) ->
  1014. recv_msgs(Count, []).
  1015. recv_msgs(0, Msgs) ->
  1016. Msgs;
  1017. recv_msgs(Count, Msgs) ->
  1018. receive
  1019. {publish, Msg} ->
  1020. recv_msgs(Count - 1, [Msg | Msgs])
  1021. after 100 ->
  1022. Msgs
  1023. end.
  1024. start_slave(Name, Port) ->
  1025. {ok, Node} = ct_slave:start(
  1026. list_to_atom(atom_to_list(Name) ++ "@" ++ host()),
  1027. [
  1028. {kill_if_fail, true},
  1029. {monitor_master, true},
  1030. {init_timeout, 10000},
  1031. {startup_timeout, 10000},
  1032. {erl_flags, ebin_path()}
  1033. ]
  1034. ),
  1035. pong = net_adm:ping(Node),
  1036. setup_node(Node, Port),
  1037. Node.
  1038. stop_slave(Node) ->
  1039. rpc:call(Node, mria, leave, []),
  1040. ct_slave:stop(Node).
  1041. host() ->
  1042. [_, Host] = string:tokens(atom_to_list(node()), "@"),
  1043. Host.
  1044. ebin_path() ->
  1045. string:join(["-pa" | lists:filter(fun is_lib/1, code:get_path())], " ").
  1046. is_lib(Path) ->
  1047. string:prefix(Path, code:lib_dir()) =:= nomatch.
  1048. setup_node(Node, Port) ->
  1049. EnvHandler =
  1050. fun(_) ->
  1051. %% We load configuration, and than set the special enviroment variable
  1052. %% which says that emqx shouldn't load configuration at startup
  1053. emqx_config:init_load(emqx_schema),
  1054. emqx_app:set_config_loader(?MODULE),
  1055. ok = emqx_config:put([listeners, tcp, default, bind], {{127, 0, 0, 1}, Port}),
  1056. ok = emqx_config:put([listeners, ssl, default, bind], {{127, 0, 0, 1}, Port + 1}),
  1057. ok = emqx_config:put([listeners, ws, default, bind], {{127, 0, 0, 1}, Port + 3}),
  1058. ok = emqx_config:put([listeners, wss, default, bind], {{127, 0, 0, 1}, Port + 4}),
  1059. ok
  1060. end,
  1061. %% Load env before doing anything
  1062. [ok = rpc:call(Node, application, load, [App]) || App <- [gen_rpc, emqx, ekka, mria]],
  1063. %% Needs to be set explicitly because ekka:start() (which calls `gen` is called without Handler
  1064. %% in emqx_common_test_helpers:start_apps(...)
  1065. ok = rpc:call(Node, application, set_env, [gen_rpc, tcp_server_port, Port - 1]),
  1066. ok = rpc:call(Node, application, set_env, [gen_rpc, port_discovery, manual]),
  1067. %% Here we start the node and make it join the cluster
  1068. ok = rpc:call(Node, emqx_common_test_helpers, start_apps, [[], EnvHandler]),
  1069. rpc:call(Node, mria, join, [node()]),
  1070. ok.