Jelajahi Sumber

Improve emqx_pmon module and add more test cases

Feng Lee 6 tahun lalu
induk
melakukan
a2d5b834da

+ 25 - 24
src/emqx_pmon.erl

@@ -36,47 +36,49 @@
 
 -opaque(pmon() :: {?MODULE, map()}).
 
+-define(PMON(Map), {?MODULE, Map}).
+
 %%--------------------------------------------------------------------
 %% APIs
 %%--------------------------------------------------------------------
 
 -spec(new() -> pmon()).
-new() ->
-    {?MODULE, maps:new()}.
+new() -> ?PMON(maps:new()).
 
 -spec(monitor(pid(), pmon()) -> pmon()).
-monitor(Pid, PM) ->
-    ?MODULE:monitor(Pid, undefined, PM).
+monitor(Pid, PMon) ->
+    ?MODULE:monitor(Pid, undefined, PMon).
 
 -spec(monitor(pid(), term(), pmon()) -> pmon()).
-monitor(Pid, Val, {?MODULE, PM}) ->
-    {?MODULE, case maps:is_key(Pid, PM) of
-                  true  -> PM;
-                  false -> Ref = erlang:monitor(process, Pid),
-                           maps:put(Pid, {Ref, Val}, PM)
-              end}.
+monitor(Pid, Val, PMon = ?PMON(Map)) ->
+    case maps:is_key(Pid, Map) of
+        true  -> PMon;
+        false ->
+            Ref = erlang:monitor(process, Pid),
+            ?PMON(maps:put(Pid, {Ref, Val}, Map))
+    end.
 
 -spec(demonitor(pid(), pmon()) -> pmon()).
-demonitor(Pid, {?MODULE, PM}) ->
-    {?MODULE, case maps:find(Pid, PM) of
-                  {ok, {Ref, _Val}} ->
-                      %% flush
-                      _ = erlang:demonitor(Ref, [flush]),
-                      maps:remove(Pid, PM);
-                  error -> PM
-              end}.
+demonitor(Pid, PMon = ?PMON(Map)) ->
+    case maps:find(Pid, Map) of
+        {ok, {Ref, _Val}} ->
+            %% flush
+            _ = erlang:demonitor(Ref, [flush]),
+            ?PMON(maps:remove(Pid, Map));
+        error -> PMon
+    end.
 
 -spec(find(pid(), pmon()) -> error | {ok, term()}).
-find(Pid, {?MODULE, PM}) ->
-    case maps:find(Pid, PM) of
+find(Pid, ?PMON(Map)) ->
+    case maps:find(Pid, Map) of
         {ok, {_Ref, Val}} ->
             {ok, Val};
         error -> error
     end.
 
 -spec(erase(pid(), pmon()) -> pmon()).
-erase(Pid, {?MODULE, PM}) ->
-    {?MODULE, maps:remove(Pid, PM)}.
+erase(Pid, ?PMON(Map)) ->
+    ?PMON(maps:remove(Pid, Map)).
 
 -spec(erase_all([pid()], pmon()) -> {[{pid(), term()}], pmon()}).
 erase_all(Pids, PMon0) ->
@@ -90,6 +92,5 @@ erase_all(Pids, PMon0) ->
       end, {[], PMon0}, Pids).
 
 -spec(count(pmon()) -> non_neg_integer()).
-count({?MODULE, PM}) ->
-    maps:size(PM).
+count(?PMON(Map)) -> maps:size(Map).
 

+ 103 - 0
test/emqx_alarm_handler_SUITE.erl

@@ -0,0 +1,103 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_alarm_handler_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include("emqx.hrl").
+-include("emqx_mqtt.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+all() -> emqx_ct:all(?MODULE).
+
+init_per_suite(Config) ->
+    emqx_ct_helpers:start_apps([], fun set_special_configs/1),
+    Config.
+
+end_per_suite(_Config) ->
+    emqx_ct_helpers:stop_apps([]).
+
+set_special_configs(emqx) ->
+    AclFile = emqx_ct_helpers:deps_path(emqx, "test/emqx_access_SUITE_data/acl_deny_action.conf"),
+    application:set_env(emqx, acl_file, AclFile);
+set_special_configs(_App) -> ok.
+
+t_alarm_handler(_) ->
+    with_connection(
+        fun(Sock) ->
+            emqx_client_sock:send(Sock,
+                                  raw_send_serialize(
+                                      ?CONNECT_PACKET(
+                                          #mqtt_packet_connect{
+                                          proto_ver = ?MQTT_PROTO_V5})
+                                   )),
+            {ok, Data} = gen_tcp:recv(Sock, 0),
+            {ok, ?CONNACK_PACKET(?RC_SUCCESS), <<>>, _} = raw_recv_parse(Data),
+
+            Topic1 = emqx_topic:systop(<<"alarms/alert">>),
+            Topic2 = emqx_topic:systop(<<"alarms/clear">>),
+            SubOpts = #{rh => 1, qos => ?QOS_2, rap => 0, nl => 0, rc => 0},
+            emqx_client_sock:send(Sock,
+                                  raw_send_serialize(
+                                      ?SUBSCRIBE_PACKET(
+                                          1,
+                                          [{Topic1, SubOpts},
+                                           {Topic2, SubOpts}])
+                                      )),
+
+            {ok, Data2} = gen_tcp:recv(Sock, 0),
+            {ok, ?SUBACK_PACKET(1, #{}, [2, 2]), <<>>, _} = raw_recv_parse(Data2),
+
+            alarm_handler:set_alarm({alarm_for_test, #alarm{id = alarm_for_test,
+                                                            severity = error,
+                                                            title="alarm title",
+                                                            summary="alarm summary"
+                                                           }}),
+
+            {ok, Data3} = gen_tcp:recv(Sock, 0),
+
+            {ok, ?PUBLISH_PACKET(?QOS_0, Topic1, _, _), <<>>, _} = raw_recv_parse(Data3),
+
+            ?assertEqual(true, lists:keymember(alarm_for_test, 1, emqx_alarm_handler:get_alarms())),
+
+            alarm_handler:clear_alarm(alarm_for_test),
+
+            {ok, Data4} = gen_tcp:recv(Sock, 0),
+
+            {ok, ?PUBLISH_PACKET(?QOS_0, Topic2, _, _), <<>>, _} = raw_recv_parse(Data4),
+
+            ?assertEqual(false, lists:keymember(alarm_for_test, 1, emqx_alarm_handler:get_alarms()))
+
+        end).
+
+with_connection(DoFun) ->
+    {ok, Sock} = emqx_client_sock:connect({127, 0, 0, 1}, 1883,
+                                          [binary, {packet, raw}, {active, false}],
+                                          3000),
+    try
+        DoFun(Sock)
+    after
+        emqx_client_sock:close(Sock)
+    end.
+
+raw_send_serialize(Packet) ->
+    emqx_frame:serialize(Packet, ?MQTT_PROTO_V5).
+
+raw_recv_parse(Bin) ->
+    emqx_frame:parse(Bin, emqx_frame:initial_parse_state(#{version => ?MQTT_PROTO_V5})).
+

+ 57 - 0
test/emqx_channel_SUITE.erl

@@ -0,0 +1,57 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_channel_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+
+all() -> emqx_ct:all(?MODULE).
+
+init_per_suite(Config) ->
+    emqx_ct_helpers:start_apps([]),
+    Config.
+
+end_per_suite(_Config) ->
+    emqx_ct_helpers:stop_apps([]).
+
+t_basic(_) ->
+    Topic = <<"TopicA">>,
+    {ok, C} = emqtt:start_link([{port, 1883}]),
+    {ok, _} = emqtt:ws_connect(C),
+    {ok, _, [1]} = emqtt:subscribe(C, Topic, qos1),
+    {ok, _, [2]} = emqtt:subscribe(C, Topic, qos2),
+    {ok, _} = emqtt:publish(C, Topic, <<"qos 2">>, 2),
+    {ok, _} = emqtt:publish(C, Topic, <<"qos 2">>, 2),
+    {ok, _} = emqtt:publish(C, Topic, <<"qos 2">>, 2),
+    ?assertEqual(3, length(recv_msgs(3))),
+    ok = emqtt:disconnect(C).
+
+recv_msgs(Count) ->
+    recv_msgs(Count, []).
+
+recv_msgs(0, Msgs) ->
+    Msgs;
+recv_msgs(Count, Msgs) ->
+    receive
+        {publish, Msg} ->
+            recv_msgs(Count-1, [Msg|Msgs])
+    after 100 ->
+        Msgs
+    end.
+

+ 259 - 0
test/emqx_client_SUITE.erl

@@ -0,0 +1,259 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_client_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-import(lists, [nth/2]).
+
+-include("emqx_mqtt.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+
+-define(TOPICS, [<<"TopicA">>,
+                 <<"TopicA/B">>,
+                 <<"Topic/C">>,
+                 <<"TopicA/C">>,
+                 <<"/TopicA">>
+                ]).
+
+-define(WILD_TOPICS, [<<"TopicA/+">>,
+                      <<"+/C">>,
+                      <<"#">>,
+                      <<"/#">>,
+                      <<"/+">>,
+                      <<"+/+">>,
+                      <<"TopicA/#">>
+                     ]).
+
+
+all() ->
+    [{group, mqttv3},
+     {group, mqttv4},
+     {group, mqttv5}
+    ].
+
+groups() ->
+    [{mqttv3, [non_parallel_tests],
+      [t_basic_v3
+      ]},
+     {mqttv4, [non_parallel_tests],
+      [t_basic_v4,
+       t_will_message,
+       %% t_offline_message_queueing,
+       t_overlapping_subscriptions,
+       %% t_keepalive,
+       %% t_redelivery_on_reconnect,
+       %% subscribe_failure_test,
+       t_dollar_topics_test
+      ]},
+     {mqttv5, [non_parallel_tests],
+      [t_basic_with_props_v5
+      ]}
+    ].
+
+init_per_suite(Config) ->
+    emqx_ct_helpers:start_apps([]),
+    Config.
+
+end_per_suite(_Config) ->
+    emqx_ct_helpers:stop_apps([]).
+
+%%--------------------------------------------------------------------
+%% Test cases for MQTT v3
+%%--------------------------------------------------------------------
+
+t_basic_v3(_) ->
+    t_basic([{proto_ver, v3}]).
+
+%%--------------------------------------------------------------------
+%% Test cases for MQTT v4
+%%--------------------------------------------------------------------
+
+t_basic_v4(_Config) ->
+    t_basic([{proto_ver, v4}]).
+
+t_will_message(_Config) ->
+    {ok, C1} = emqx_client:start_link([{clean_start, true},
+                                       {will_topic, nth(3, ?TOPICS)},
+                                       {will_payload, <<"client disconnected">>},
+                                       {keepalive, 1}]),
+    {ok, _} = emqx_client:connect(C1),
+
+    {ok, C2} = emqx_client:start_link(),
+    {ok, _} = emqx_client:connect(C2),
+
+    {ok, _, [2]} = emqx_client:subscribe(C2, nth(3, ?TOPICS), 2),
+    timer:sleep(5),
+    ok = emqx_client:stop(C1),
+    timer:sleep(5),
+    ?assertEqual(1, length(recv_msgs(1))),
+    ok = emqx_client:disconnect(C2),
+    ct:pal("Will message test succeeded").
+
+t_offline_message_queueing(_) ->
+    {ok, C1} = emqx_client:start_link([{clean_start, false},
+                                       {client_id, <<"c1">>}]),
+    {ok, _} = emqx_client:connect(C1),
+
+    {ok, _, [2]} = emqx_client:subscribe(C1, nth(6, ?WILD_TOPICS), 2),
+    ok = emqx_client:disconnect(C1),
+    {ok, C2} = emqx_client:start_link([{clean_start, true},
+                                       {client_id, <<"c2">>}]),
+    {ok, _} = emqx_client:connect(C2),
+
+    ok = emqx_client:publish(C2, nth(2, ?TOPICS), <<"qos 0">>, 0),
+    {ok, _} = emqx_client:publish(C2, nth(3, ?TOPICS), <<"qos 1">>, 1),
+    {ok, _} = emqx_client:publish(C2, nth(4, ?TOPICS), <<"qos 2">>, 2),
+    timer:sleep(10),
+    emqx_client:disconnect(C2),
+    {ok, C3} = emqx_client:start_link([{clean_start, false},
+                                       {client_id, <<"c1">>}]),
+    {ok, _} = emqx_client:connect(C3),
+
+    timer:sleep(10),
+    emqx_client:disconnect(C3),
+    ?assertEqual(3, length(recv_msgs(3))).
+
+t_overlapping_subscriptions(_) ->
+    {ok, C} = emqx_client:start_link([]),
+    {ok, _} = emqx_client:connect(C),
+
+    {ok, _, [2, 1]} = emqx_client:subscribe(C, [{nth(7, ?WILD_TOPICS), 2},
+                                                {nth(1, ?WILD_TOPICS), 1}]),
+    timer:sleep(10),
+    {ok, _} = emqx_client:publish(C, nth(4, ?TOPICS), <<"overlapping topic filters">>, 2),
+    timer:sleep(10),
+
+    Num = length(recv_msgs(2)),
+    ?assert(lists:member(Num, [1, 2])),
+    if
+        Num == 1 ->
+            ct:pal("This server is publishing one message for all
+                   matching overlapping subscriptions, not one for each.");
+        Num == 2 ->
+            ct:pal("This server is publishing one message per each
+                    matching overlapping subscription.");
+        true -> ok
+    end,
+    emqx_client:disconnect(C).
+
+%% t_keepalive_test(_) ->
+%%     ct:print("Keepalive test starting"),
+%%     {ok, C1, _} = emqx_client:start_link([{clean_start, true},
+%%                                           {keepalive, 5},
+%%                                           {will_flag, true},
+%%                                           {will_topic, nth(5, ?TOPICS)},
+%%                                           %% {will_qos, 2},
+%%                                           {will_payload, <<"keepalive expiry">>}]),
+%%     ok = emqx_client:pause(C1),
+%%     {ok, C2, _} = emqx_client:start_link([{clean_start, true},
+%%                                           {keepalive, 0}]),
+%%     {ok, _, [2]} = emqx_client:subscribe(C2, nth(5, ?TOPICS), 2),
+%%     ok = emqx_client:disconnect(C2),
+%%     ?assertEqual(1, length(recv_msgs(1))),
+%%     ct:print("Keepalive test succeeded").
+
+t_redelivery_on_reconnect(_) ->
+    ct:pal("Redelivery on reconnect test starting"),
+    {ok, C1} = emqx_client:start_link([{clean_start, false},
+                                       {client_id, <<"c">>}]),
+    {ok, _} = emqx_client:connect(C1),
+
+    {ok, _, [2]} = emqx_client:subscribe(C1, nth(7, ?WILD_TOPICS), 2),
+    timer:sleep(10),
+    ok = emqx_client:pause(C1),
+    {ok, _} = emqx_client:publish(C1, nth(2, ?TOPICS), <<>>,
+                                  [{qos, 1}, {retain, false}]),
+    {ok, _} = emqx_client:publish(C1, nth(4, ?TOPICS), <<>>,
+                                  [{qos, 2}, {retain, false}]),
+    timer:sleep(10),
+    ok = emqx_client:disconnect(C1),
+    ?assertEqual(0, length(recv_msgs(2))),
+    {ok, C2} = emqx_client:start_link([{clean_start, false},
+                                       {client_id, <<"c">>}]),
+    {ok, _} = emqx_client:connect(C2),
+
+    timer:sleep(10),
+    ok = emqx_client:disconnect(C2),
+    ?assertEqual(2, length(recv_msgs(2))).
+
+%% t_subscribe_sys_topics(_) ->
+%%     ct:print("Subscribe failure test starting"),
+%%     {ok, C, _} = emqx_client:start_link([]),
+%%     {ok, _, [2]} = emqx_client:subscribe(C, <<"$SYS/#">>, 2),
+%%     timer:sleep(10),
+%%     ct:print("Subscribe failure test succeeded").
+
+t_dollar_topics(_) ->
+    ct:pal("$ topics test starting"),
+    {ok, C} = emqx_client:start_link([{clean_start, true},
+                                      {keepalive, 0}]),
+    {ok, _} = emqx_client:connect(C),
+
+    {ok, _, [1]} = emqx_client:subscribe(C, nth(6, ?WILD_TOPICS), 1),
+    {ok, _} = emqx_client:publish(C, << <<"$">>/binary, (nth(2, ?TOPICS))/binary>>,
+                                  <<"test">>, [{qos, 1}, {retain, false}]),
+    timer:sleep(10),
+    ?assertEqual(0, length(recv_msgs(1))),
+    ok = emqx_client:disconnect(C),
+    ct:pal("$ topics test succeeded").
+
+%%--------------------------------------------------------------------
+%% Test cases for MQTT v5
+%%--------------------------------------------------------------------
+
+t_basic_with_props_v5(_) ->
+    t_basic([{proto_ver, v5},
+             {properties, #{'Receive-Maximum' => 4}}
+            ]).
+
+%%--------------------------------------------------------------------
+%% General test cases.
+%%--------------------------------------------------------------------
+
+t_basic(Opts) ->
+    Topic = nth(1, ?TOPICS),
+    {ok, C} = emqx_client:start_link([{proto_ver, v4}]),
+    {ok, _} = emqx_client:connect(C),
+    {ok, _, [1]} = emqx_client:subscribe(C, Topic, qos1),
+    {ok, _, [2]} = emqx_client:subscribe(C, Topic, qos2),
+    {ok, _} = emqx_client:publish(C, Topic, <<"qos 2">>, 2),
+    {ok, _} = emqx_client:publish(C, Topic, <<"qos 2">>, 2),
+    {ok, _} = emqx_client:publish(C, Topic, <<"qos 2">>, 2),
+    ?assertEqual(3, length(recv_msgs(3))),
+    ok = emqx_client:disconnect(C).
+
+%%--------------------------------------------------------------------
+%% Helper functions
+%%--------------------------------------------------------------------
+
+recv_msgs(Count) ->
+    recv_msgs(Count, []).
+
+recv_msgs(0, Msgs) ->
+    Msgs;
+recv_msgs(Count, Msgs) ->
+    receive
+        {publish, Msg} ->
+            recv_msgs(Count-1, [Msg|Msgs]);
+        _Other -> recv_msgs(Count, Msgs) %%TODO:: remove the branch?
+    after 100 ->
+        Msgs
+    end.
+

+ 56 - 0
test/emqx_flapping_SUITE.erl

@@ -0,0 +1,56 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_flapping_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+all() -> emqx_ct:all(?MODULE).
+
+init_per_suite(Config) ->
+    emqx_ct_helpers:start_apps([]),
+    prepare_for_test(),
+    Config.
+
+end_per_suite(_Config) ->
+    emqx_ct_helpers:stop_apps([]).
+
+t_flapping(_Config) ->
+    process_flag(trap_exit, true),
+    flapping_connect(5),
+    {ok, C} = emqx_client:start_link([{client_id, <<"Client">>}]),
+    {error, _} = emqx_client:connect(C),
+    receive
+        {'EXIT', Client, _Reason} ->
+            ct:log("receive exit signal, Client: ~p", [Client])
+    after 1000 ->
+            ct:log("timeout")
+    end.
+
+flapping_connect(Times) ->
+    lists:foreach(fun do_connect/1, lists:seq(1, Times)).
+
+do_connect(_I) ->
+    {ok, C} = emqx_client:start_link([{client_id, <<"Client">>}]),
+    {ok, _} = emqx_client:connect(C),
+    ok = emqx_client:disconnect(C).
+
+prepare_for_test() ->
+    emqx_zone:set_env(external, enable_flapping_detect, true),
+    emqx_zone:set_env(external, flapping_threshold, {10, 60}),
+    emqx_zone:set_env(external, flapping_expiry_interval, 3600).
+

+ 10 - 2
test/emqx_gc_SUITE.erl

@@ -33,6 +33,9 @@ t_init(_) ->
     ?assertEqual(#{cnt => {10, 10}, oct => {10, 10}}, emqx_gc:info(GC3)).
 
 t_run(_) ->
+    Undefined = emqx_gc:init(false),
+    ?assertEqual(undefined, Undefined),
+    ?assertEqual({false, undefined}, emqx_gc:run(1, 1, Undefined)),
     GC = emqx_gc:init(#{count => 10, bytes => 10}),
     ?assertEqual({true, GC}, emqx_gc:run(1, 1000, GC)),
     ?assertEqual({true, GC}, emqx_gc:run(1000, 1, GC)),
@@ -42,7 +45,10 @@ t_run(_) ->
     ?assertEqual(#{cnt => {10, 7}, oct => {10, 7}}, emqx_gc:info(GC2)),
     {false, GC3} = emqx_gc:run(3, 3, GC2),
     ?assertEqual(#{cnt => {10, 4}, oct => {10, 4}}, emqx_gc:info(GC3)),
-    ?assertEqual({true, GC}, emqx_gc:run(4, 4, GC3)).
+    ?assertEqual({true, GC}, emqx_gc:run(4, 4, GC3)),
+    %% Disabled?
+    DisabledGC = emqx_gc:init(#{count => 0, bytes => 0}),
+    ?assertEqual({false, DisabledGC}, emqx_gc:run(1, 1, DisabledGC)).
 
 t_info(_) ->
     ?assertEqual(undefined, emqx_gc:info(undefined)),
@@ -54,5 +60,7 @@ t_reset(_) ->
     GC = emqx_gc:init(#{count => 10, bytes => 10}),
     {false, GC1} = emqx_gc:run(5, 5, GC),
     ?assertEqual(#{cnt => {10, 5}, oct => {10, 5}}, emqx_gc:info(GC1)),
-    ?assertEqual(GC, emqx_gc:reset(GC1)).
+    ?assertEqual(GC, emqx_gc:reset(GC1)),
+    DisabledGC = emqx_gc:init(#{count => 0, bytes => 0}),
+    ?assertEqual(DisabledGC, emqx_gc:reset(DisabledGC)).
 

+ 81 - 0
test/emqx_mod_rewrite_SUITE.erl

@@ -0,0 +1,81 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_mod_rewrite_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-import(emqx_mod_rewrite,
+        [ rewrite_subscribe/4
+        , rewrite_unsubscribe/4
+        , rewrite_publish/2
+        ]).
+
+-include_lib("emqx.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-define(TEST_RULES, [<<"x/# ^x/y/(.+)$ z/y/$1">>,
+                     <<"y/+/z/# ^y/(.+)/z/(.+)$ y/z/$2">>
+                    ]).
+
+all() -> emqx_ct:all(?MODULE).
+
+%%--------------------------------------------------------------------
+%% Test cases
+%%--------------------------------------------------------------------
+
+t_rewrite_subscribe(_) ->
+    ?assertEqual({ok, [{<<"test">>, #{}}]},
+                 rewrite(subscribe, [{<<"test">>, #{}}])),
+    ?assertEqual({ok, [{<<"z/y/test">>, #{}}]},
+                 rewrite(subscribe, [{<<"x/y/test">>, #{}}])),
+    ?assertEqual({ok, [{<<"y/z/test_topic">>, #{}}]},
+                 rewrite(subscribe, [{<<"y/test/z/test_topic">>, #{}}])).
+
+t_rewrite_unsubscribe(_) ->
+    ?assertEqual({ok, [{<<"test">>, #{}}]},
+                 rewrite(unsubscribe, [{<<"test">>, #{}}])),
+    ?assertEqual({ok, [{<<"z/y/test">>, #{}}]},
+                 rewrite(unsubscribe, [{<<"x/y/test">>, #{}}])),
+    ?assertEqual({ok, [{<<"y/z/test_topic">>, #{}}]},
+                 rewrite(unsubscribe, [{<<"y/test/z/test_topic">>, #{}}])).
+
+t_rewrite_publish(_) ->
+    ?assertMatch({ok, #message{topic = <<"test">>}},
+                 rewrite(publish, #message{topic = <<"test">>})),
+    ?assertMatch({ok, #message{topic = <<"z/y/test">>}},
+                 rewrite(publish, #message{topic = <<"x/y/test">>})),
+    ?assertMatch({ok, #message{topic = <<"y/z/test_topic">>}},
+                 rewrite(publish, #message{topic = <<"y/test/z/test_topic">>})).
+
+%%--------------------------------------------------------------------
+%% Helper functions
+%%--------------------------------------------------------------------
+
+rewrite(subscribe, TopicFilters) ->
+    rewrite_subscribe(#{}, #{}, TopicFilters, rules());
+rewrite(unsubscribe, TopicFilters) ->
+    rewrite_unsubscribe(#{}, #{}, TopicFilters, rules());
+rewrite(publish, Msg) -> rewrite_publish(Msg, rules()).
+
+rules() ->
+    [begin
+         [Topic, Re, Dest] = string:split(Rule, " ", all),
+         {ok, MP} = re:compile(Re),
+         {rewrite, Topic, MP, Dest}
+     end || Rule <- ?TEST_RULES].
+

+ 4 - 0
test/emqx_pmon_SUITE.erl

@@ -28,14 +28,17 @@ t_monitor(_) ->
     PMon1 = emqx_pmon:monitor(self(), PMon),
     ?assertEqual(1, emqx_pmon:count(PMon1)),
     PMon2 = emqx_pmon:demonitor(self(), PMon1),
+    PMon2 = emqx_pmon:demonitor(self(), PMon2),
     ?assertEqual(0, emqx_pmon:count(PMon2)).
 
 t_find(_) ->
     PMon = emqx_pmon:new(),
     PMon1 = emqx_pmon:monitor(self(), val, PMon),
+    PMon1 = emqx_pmon:monitor(self(), val, PMon1),
     ?assertEqual(1, emqx_pmon:count(PMon1)),
     ?assertEqual({ok, val}, emqx_pmon:find(self(), PMon1)),
     PMon2 = emqx_pmon:erase(self(), PMon1),
+    PMon2 = emqx_pmon:erase(self(), PMon1),
     ?assertEqual(error, emqx_pmon:find(self(), PMon2)).
 
 t_erase(_) ->
@@ -44,6 +47,7 @@ t_erase(_) ->
     PMon2 = emqx_pmon:erase(self(), PMon1),
     ?assertEqual(0, emqx_pmon:count(PMon2)),
     {Items, PMon3} = emqx_pmon:erase_all([self()], PMon1),
+    {[], PMon3} = emqx_pmon:erase_all([self()], PMon3),
     ?assertEqual([{self(), val}], Items),
     ?assertEqual(0, emqx_pmon:count(PMon3)).
 

+ 144 - 0
test/emqx_protocol_SUITE.erl

@@ -0,0 +1,144 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_protocol_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-import(emqx_protocol,
+        [ handle_in/2
+        , handle_out/2
+        ]).
+
+-include("emqx.hrl").
+-include("emqx_mqtt.hrl").
+
+-include_lib("eunit/include/eunit.hrl").
+
+all() -> emqx_ct:all(?MODULE).
+
+init_per_suite(Config) ->
+    emqx_ct_helpers:start_apps([]),
+    Config.
+
+end_per_suite(_Config) ->
+    emqx_ct_helpers:stop_apps([]).
+
+%%--------------------------------------------------------------------
+%% Test cases for handle_in
+%%--------------------------------------------------------------------
+
+t_handle_in_connect(_) ->
+    'TODO'.
+
+t_handle_in_publish(_) ->
+    'TODO'.
+
+t_handle_in_puback(_) ->
+    'TODO'.
+
+t_handle_in_pubrec(_) ->
+    'TODO'.
+
+t_handle_in_pubrel(_) ->
+    'TODO'.
+
+t_handle_in_pubcomp(_) ->
+    'TODO'.
+
+t_handle_in_subscribe(_) ->
+    'TODO'.
+
+t_handle_in_unsubscribe(_) ->
+    'TODO'.
+
+t_handle_in_pingreq(_) ->
+    with_proto(fun(PState) ->
+                       {ok, ?PACKET(?PINGRESP), PState} = handle_in(?PACKET(?PINGREQ), PState)
+               end).
+
+t_handle_in_disconnect(_) ->
+    'TODO'.
+
+t_handle_in_auth(_) ->
+    'TODO'.
+
+%%--------------------------------------------------------------------
+%% Test cases for handle_deliver
+%%--------------------------------------------------------------------
+
+t_handle_deliver(_) ->
+    'TODO'.
+
+%%--------------------------------------------------------------------
+%% Test cases for handle_out
+%%--------------------------------------------------------------------
+
+t_handle_out_conack(_) ->
+    'TODO'.
+
+t_handle_out_publish(_) ->
+    'TODO'.
+
+t_handle_out_puback(_) ->
+    'TODO'.
+
+t_handle_out_pubrec(_) ->
+    'TODO'.
+
+t_handle_out_pubrel(_) ->
+    'TODO'.
+
+t_handle_out_pubcomp(_) ->
+    'TODO'.
+
+t_handle_out_suback(_) ->
+    'TODO'.
+
+t_handle_out_unsuback(_) ->
+    'TODO'.
+
+t_handle_out_disconnect(_) ->
+    'TODO'.
+
+t_handle_out_auth(_) ->
+    'TODO'.
+
+%%--------------------------------------------------------------------
+%% Test cases for handle_timeout
+%%--------------------------------------------------------------------
+
+t_handle_timeout(_) ->
+    'TODO'.
+
+%%--------------------------------------------------------------------
+%% Test cases for terminate
+%%--------------------------------------------------------------------
+
+t_terminate(_) ->
+    'TODO'.
+
+%%--------------------------------------------------------------------
+%% Helper functions
+%%--------------------------------------------------------------------
+
+with_proto(Fun) ->
+    Fun(emqx_protocol:init(#{peername => {{127,0,0,1}, 3456},
+                             sockname => {{127,0,0,1}, 1883},
+                             conn_mod => emqx_channel},
+                           #{zone => ?MODULE})).
+

+ 251 - 0
test/emqx_shared_sub_SUITE.erl

@@ -0,0 +1,251 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_shared_sub_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include("emqx.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+
+-define(SUITE, ?MODULE).
+-define(wait(For, Timeout),
+        emqx_ct_helpers:wait_for(
+          ?FUNCTION_NAME, ?LINE, fun() -> For end, Timeout)).
+
+all() -> emqx_ct:all(?SUITE).
+
+init_per_suite(Config) ->
+    emqx_ct_helpers:start_apps([]),
+    Config.
+
+end_per_suite(_Config) ->
+    emqx_ct_helpers:stop_apps([]).
+
+t_random_basic(_) ->
+    ok = ensure_config(random),
+    ClientId = <<"ClientId">>,
+    {ok, ConnPid} = emqx_mock_client:start_link(ClientId),
+    {ok, SPid} = emqx_mock_client:open_session(ConnPid, ClientId, internal),
+    Message1 = emqx_message:make(<<"ClientId">>, 2, <<"foo">>, <<"hello">>),
+    emqx_session:subscribe(SPid, [{<<"foo">>, #{qos => 2, share => <<"group1">>}}]),
+    %% wait for the subscription to show up
+    ?wait(subscribed(<<"group1">>, <<"foo">>, SPid), 1000),
+    PacketId = 1,
+    emqx_session:publish(SPid, PacketId, Message1),
+    ?wait(case emqx_mock_client:get_last_message(ConnPid) of
+              [{publish, 1, _}] -> true;
+              Other -> Other
+          end, 1000),
+    emqx_session:pubrec(SPid, PacketId, reasoncode),
+    emqx_session:pubcomp(SPid, PacketId, reasoncode),
+    emqx_mock_client:close_session(ConnPid),
+    ok.
+
+%% Start two subscribers share subscribe to "$share/g1/foo/bar"
+%% Set 'sticky' dispatch strategy, send 1st message to find
+%% out which member it picked, then close its connection
+%% send the second message, the message should be 'nack'ed
+%% by the sticky session and delivered to the 2nd session.
+%% After the connection for the 2nd session is also closed,
+%% i.e. when all clients are offline, the following message(s)
+%% should be delivered randomly.
+t_no_connection_nack(_) ->
+    ok = ensure_config(sticky),
+    Publisher = <<"publisher">>,
+    Subscriber1 = <<"Subscriber1">>,
+    Subscriber2 = <<"Subscriber2">>,
+    QoS = 1,
+    Group = <<"g1">>,
+    Topic = <<"foo/bar">>,
+    {ok, PubConnPid} = emqx_mock_client:start_link(Publisher),
+    {ok, SubConnPid1} = emqx_mock_client:start_link(Subscriber1),
+    {ok, SubConnPid2} = emqx_mock_client:start_link(Subscriber2),
+    %% allow session to persist after connection shutdown
+    Attrs = #{expiry_interval => timer:seconds(30)},
+    {ok, P_Pid} = emqx_mock_client:open_session(PubConnPid, Publisher, internal, Attrs),
+    {ok, SPid1} = emqx_mock_client:open_session(SubConnPid1, Subscriber1, internal, Attrs),
+    {ok, SPid2} = emqx_mock_client:open_session(SubConnPid2, Subscriber2, internal, Attrs),
+    emqx_session:subscribe(SPid1, [{Topic, #{qos => QoS, share => Group}}]),
+    emqx_session:subscribe(SPid2, [{Topic, #{qos => QoS, share => Group}}]),
+    %% wait for the subscriptions to show up
+    ?wait(subscribed(Group, Topic, SPid1), 1000),
+    ?wait(subscribed(Group, Topic, SPid2), 1000),
+    MkPayload = fun(PacketId) -> iolist_to_binary(["hello-", integer_to_list(PacketId)]) end,
+    SendF = fun(PacketId) -> emqx_session:publish(P_Pid, PacketId, emqx_message:make(Publisher, QoS, Topic, MkPayload(PacketId))) end,
+    SendF(1),
+    Ref = make_ref(),
+    CasePid = self(),
+    Received =
+        fun(PacketId, ConnPid) ->
+                Payload = MkPayload(PacketId),
+                case emqx_mock_client:get_last_message(ConnPid) of
+                    [{publish, _, #message{payload = Payload}}] ->
+                        CasePid ! {Ref, PacketId, ConnPid},
+                        true;
+                    _Other ->
+                        false
+                end
+        end,
+    ?wait(Received(1, SubConnPid1) orelse Received(1, SubConnPid2), 1000),
+    %% This is the connection which was picked by broker to dispatch (sticky) for 1st message
+    ConnPid = receive {Ref, 1, Pid} -> Pid after 1000 -> error(timeout) end,
+    %% Now kill the connection, expect all following messages to be delivered to the other subscriber.
+    emqx_mock_client:stop(ConnPid),
+    %% sleep then make synced calls to session processes to ensure that
+    %% the connection pid's 'EXIT' message is propagated to the session process
+    %% also to be sure sessions are still alive
+    timer:sleep(2),
+    _ = emqx_session:info(SPid1),
+    _ = emqx_session:info(SPid2),
+    %% Now we know what is the other still alive connection
+    [TheOtherConnPid] = [SubConnPid1, SubConnPid2] -- [ConnPid],
+    %% Send some more messages
+    PacketIdList = lists:seq(2, 10),
+    lists:foreach(fun(Id) ->
+                          SendF(Id),
+                          ?wait(Received(Id, TheOtherConnPid), 1000)
+                  end, PacketIdList),
+    %% Now close the 2nd (last connection)
+    emqx_mock_client:stop(TheOtherConnPid),
+    timer:sleep(2),
+    %% both sessions should have conn_pid = undefined
+    ?assertEqual({conn_pid, undefined}, lists:keyfind(conn_pid, 1, emqx_session:info(SPid1))),
+    ?assertEqual({conn_pid, undefined}, lists:keyfind(conn_pid, 1, emqx_session:info(SPid2))),
+    %% send more messages, but all should be queued in session state
+    lists:foreach(fun(Id) -> SendF(Id) end, PacketIdList),
+    {_, L1} = lists:keyfind(mqueue_len, 1, emqx_session:info(SPid1)),
+    {_, L2} = lists:keyfind(mqueue_len, 1, emqx_session:info(SPid2)),
+    ?assertEqual(length(PacketIdList), L1 + L2),
+    %% clean up
+    emqx_mock_client:close_session(PubConnPid),
+    emqx_sm:close_session(SPid1),
+    emqx_sm:close_session(SPid2),
+    ok.
+
+t_random(_) ->
+    test_two_messages(random).
+
+t_round_robin(_) ->
+    test_two_messages(round_robin).
+
+t_sticky(_) ->
+    test_two_messages(sticky).
+
+t_hash(_) ->
+    test_two_messages(hash, false).
+
+%% if the original subscriber dies, change to another one alive
+t_not_so_sticky(_) ->
+    ok = ensure_config(sticky),
+    ClientId1 = <<"ClientId1">>,
+    ClientId2 = <<"ClientId2">>,
+    {ok, ConnPid1} = emqx_mock_client:start_link(ClientId1),
+    {ok, ConnPid2} = emqx_mock_client:start_link(ClientId2),
+    {ok, SPid1} = emqx_mock_client:open_session(ConnPid1, ClientId1, internal),
+    {ok, SPid2} = emqx_mock_client:open_session(ConnPid2, ClientId2, internal),
+    Message1 = emqx_message:make(ClientId1, 0, <<"foo/bar">>, <<"hello1">>),
+    Message2 = emqx_message:make(ClientId1, 0, <<"foo/bar">>, <<"hello2">>),
+    emqx_session:subscribe(SPid1, [{<<"foo/bar">>, #{qos => 0, share => <<"group1">>}}]),
+    %% wait for the subscription to show up
+    ?wait(subscribed(<<"group1">>, <<"foo/bar">>, SPid1), 1000),
+    emqx_session:publish(SPid1, 1, Message1),
+    ?wait(case emqx_mock_client:get_last_message(ConnPid1) of
+              [{publish, _, #message{payload = <<"hello1">>}}] -> true;
+              Other -> Other
+          end, 1000),
+    emqx_mock_client:close_session(ConnPid1),
+    ?wait(not subscribed(<<"group1">>, <<"foo/bar">>, SPid1), 1000),
+    emqx_session:subscribe(SPid2, [{<<"foo/#">>, #{qos => 0, share => <<"group1">>}}]),
+    ?wait(subscribed(<<"group1">>, <<"foo/#">>, SPid2), 1000),
+    emqx_session:publish(SPid2, 2, Message2),
+    ?wait(case emqx_mock_client:get_last_message(ConnPid2) of
+              [{publish, _, #message{payload = <<"hello2">>}}] -> true;
+              Other -> Other
+          end, 1000),
+    emqx_mock_client:close_session(ConnPid2),
+    ?wait(not subscribed(<<"group1">>, <<"foo/#">>, SPid2), 1000),
+    ok.
+
+test_two_messages(Strategy) ->
+    test_two_messages(Strategy, _WithAck = true).
+
+test_two_messages(Strategy, WithAck) ->
+    ok = ensure_config(Strategy, WithAck),
+    Topic = <<"foo/bar">>,
+    ClientId1 = <<"ClientId1">>,
+    ClientId2 = <<"ClientId2">>,
+    {ok, ConnPid1} = emqx_mock_client:start_link(ClientId1),
+    {ok, ConnPid2} = emqx_mock_client:start_link(ClientId2),
+    {ok, SPid1} = emqx_mock_client:open_session(ConnPid1, ClientId1, internal),
+    {ok, SPid2} = emqx_mock_client:open_session(ConnPid2, ClientId2, internal),
+    Message1 = emqx_message:make(ClientId1, 0, Topic, <<"hello1">>),
+    Message2 = emqx_message:make(ClientId1, 0, Topic, <<"hello2">>),
+    emqx_session:subscribe(SPid1, [{Topic, #{qos => 0, share => <<"group1">>}}]),
+    emqx_session:subscribe(SPid2, [{Topic, #{qos => 0, share => <<"group1">>}}]),
+    %% wait for the subscription to show up
+    ?wait(subscribed(<<"group1">>, Topic, SPid1) andalso
+          subscribed(<<"group1">>, Topic, SPid2), 1000),
+    emqx_broker:publish(Message1),
+    Me = self(),
+    WaitF = fun(ExpectedPayload) ->
+                    case last_message(ExpectedPayload, [ConnPid1, ConnPid2]) of
+                        {true, Pid} ->
+                            Me ! {subscriber, Pid},
+                            true;
+                        Other ->
+                            Other
+                    end
+            end,
+    ?wait(WaitF(<<"hello1">>), 2000),
+    UsedSubPid1 = receive {subscriber, P1} -> P1 end,
+    emqx_broker:publish(Message2),
+    ?wait(WaitF(<<"hello2">>), 2000),
+    UsedSubPid2 = receive {subscriber, P2} -> P2 end,
+    case Strategy of
+        sticky -> ?assert(UsedSubPid1 =:= UsedSubPid2);
+        round_robin -> ?assert(UsedSubPid1 =/= UsedSubPid2);
+        hash -> ?assert(UsedSubPid1 =:= UsedSubPid2);
+        _ -> ok
+    end,
+    emqx_mock_client:close_session(ConnPid1),
+    emqx_mock_client:close_session(ConnPid2),
+    ok.
+
+last_message(_ExpectedPayload, []) -> <<"not yet?">>;
+last_message(ExpectedPayload, [Pid | Pids]) ->
+    case emqx_mock_client:get_last_message(Pid) of
+        [{publish, _, #message{payload = ExpectedPayload}}] -> {true, Pid};
+        _Other -> last_message(ExpectedPayload, Pids)
+    end.
+
+%%--------------------------------------------------------------------
+%% help functions
+%%--------------------------------------------------------------------
+
+ensure_config(Strategy) ->
+    ensure_config(Strategy, _AckEnabled = true).
+
+ensure_config(Strategy, AckEnabled) ->
+    application:set_env(emqx, shared_subscription_strategy, Strategy),
+    application:set_env(emqx, shared_dispatch_ack_enabled, AckEnabled),
+    ok.
+
+subscribed(Group, Topic, Pid) ->
+    lists:member(Pid, emqx_shared_sub:subscribers(Group, Topic)).
+

+ 57 - 0
test/emqx_ws_channel_SUITE.erl

@@ -0,0 +1,57 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_ws_channel_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+
+all() -> emqx_ct:all(?MODULE).
+
+init_per_suite(Config) ->
+    emqx_ct_helpers:start_apps([]),
+    Config.
+
+end_per_suite(_Config) ->
+    emqx_ct_helpers:stop_apps([]).
+
+t_basic(_) ->
+    Topic = <<"TopicA">>,
+    {ok, C} = emqtt:start_link([{port, 8083}]),
+    {ok, _} = emqtt:ws_connect(C),
+    {ok, _, [1]} = emqtt:subscribe(C, Topic, qos1),
+    {ok, _, [2]} = emqtt:subscribe(C, Topic, qos2),
+    {ok, _} = emqtt:publish(C, Topic, <<"qos 2">>, 2),
+    {ok, _} = emqtt:publish(C, Topic, <<"qos 2">>, 2),
+    {ok, _} = emqtt:publish(C, Topic, <<"qos 2">>, 2),
+    ?assertEqual(3, length(recv_msgs(3))),
+    ok = emqx_client:disconnect(C).
+
+recv_msgs(Count) ->
+    recv_msgs(Count, []).
+
+recv_msgs(0, Msgs) ->
+    Msgs;
+recv_msgs(Count, Msgs) ->
+    receive
+        {publish, Msg} ->
+            recv_msgs(Count-1, [Msg|Msgs])
+    after 100 ->
+        Msgs
+    end.
+