소스 검색

fix(shared): re-dispatch inflight (QoS1) and mqueue messages

when session terminates (not due to take over)
shared delivery should be re-dispatched to other members
in the group
Zaiming (Stone) Shi 3 년 전
부모
커밋
e8279a02ef
5개의 변경된 파일270개의 추가작업 그리고 93개의 파일을 삭제
  1. 1 0
      CHANGES-5.0.md
  2. 18 4
      apps/emqx/src/emqx_mqueue.erl
  3. 16 24
      apps/emqx/src/emqx_session.erl
  4. 51 2
      apps/emqx/src/emqx_shared_sub.erl
  5. 184 63
      apps/emqx/test/emqx_shared_sub_SUITE.erl

+ 1 - 0
CHANGES-5.0.md

@@ -10,6 +10,7 @@
 * Fix GET /listeners API crash When some nodes still in initial configuration. [#9002](https://github.com/emqx/emqx/pull/9002)
 * Fix empty variable interpolation in authentication and authorization. Placeholders for undefined variables are rendered now as empty strings and do not cause errors anymore. [#8963](https://github.com/emqx/emqx/pull/8963)
 * Fix the latency statistics error of the slow subscription module when `stats_type` is `internal` or `response`. [#8986](https://github.com/emqx/emqx/pull/8986)
+* Redispatch shared subscription messages.
 
 # 5.0.8
 

+ 18 - 4
apps/emqx/src/emqx_mqueue.erl

@@ -66,7 +66,8 @@
     in/2,
     out/1,
     stats/1,
-    dropped/1
+    dropped/1,
+    to_list/1
 ]).
 
 -define(NO_PRIORITY_TABLE, disabled).
@@ -109,7 +110,7 @@
     dropped = 0 :: count(),
     p_table = ?NO_PRIORITY_TABLE :: p_table(),
     default_p = ?LOWEST_PRIORITY :: priority(),
-    q = ?PQUEUE:new() :: pq(),
+    q = emqx_pqueue:new() :: pq(),
     shift_opts :: #shift_opts{},
     last_prio :: non_neg_integer() | undefined,
     p_credit :: non_neg_integer() | undefined
@@ -118,7 +119,7 @@
 -type mqueue() :: #mqueue{}.
 
 -spec init(options()) -> mqueue().
-init(Opts = #{max_len := MaxLen0, store_qos0 := QoS_0}) ->
+init(Opts = #{max_len := MaxLen0, store_qos0 := Qos0}) ->
     MaxLen =
         case (is_integer(MaxLen0) andalso MaxLen0 > ?MAX_LEN_INFINITY) of
             true -> MaxLen0;
@@ -126,7 +127,7 @@ init(Opts = #{max_len := MaxLen0, store_qos0 := QoS_0}) ->
         end,
     #mqueue{
         max_len = MaxLen,
-        store_qos0 = QoS_0,
+        store_qos0 = Qos0,
         p_table = get_opt(priorities, Opts, ?NO_PRIORITY_TABLE),
         default_p = get_priority_opt(Opts),
         shift_opts = get_shift_opt(Opts)
@@ -152,6 +153,19 @@ len(#mqueue{len = Len}) -> Len.
 
 max_len(#mqueue{max_len = MaxLen}) -> MaxLen.
 
+%% @doc Return all queued items in a list.
+-spec to_list(mqueue()) -> list().
+to_list(MQ) ->
+    to_list(MQ, []).
+
+to_list(MQ, Acc) ->
+    case out(MQ) of
+        {empty, _MQ} ->
+            lists:reverse(Acc);
+        {{value, Msg}, Q1} ->
+            to_list(Q1, [Msg | Acc])
+    end.
+
 %% @doc Return number of dropped messages.
 -spec dropped(mqueue()) -> count().
 dropped(#mqueue{dropped = Dropped}) -> Dropped.

+ 16 - 24
apps/emqx/src/emqx_session.erl

@@ -801,7 +801,8 @@ replay(ClientInfo, Session = #session{inflight = Inflight}) ->
 -spec terminate(emqx_types:clientinfo(), Reason :: term(), session()) -> ok.
 terminate(ClientInfo, Reason, Session) ->
     run_terminate_hooks(ClientInfo, Reason, Session),
-    redispatch_shared_messages(Session),
+    Reason =/= takenover andalso
+        redispatch_shared_messages(Session),
     ok.
 
 run_terminate_hooks(ClientInfo, discarded, Session) ->
@@ -811,29 +812,20 @@ run_terminate_hooks(ClientInfo, takenover, Session) ->
 run_terminate_hooks(ClientInfo, Reason, Session) ->
     run_hook('session.terminated', [ClientInfo, Reason, info(Session)]).
 
-redispatch_shared_messages(#session{inflight = Inflight}) ->
-    InflightList = emqx_inflight:to_list(Inflight),
-    lists:foreach(
-        fun
-            %% Only QoS1 messages get redispatched, because QoS2 messages
-            %% must be sent to the same client, once they're in flight
-            ({_, #inflight_data{message = #message{qos = ?QOS_2} = Msg}}) ->
-                ?SLOG(warning, #{msg => qos2_lost_no_redispatch}, #{message => Msg});
-            ({_, #inflight_data{message = #message{topic = Topic, qos = ?QOS_1} = Msg}}) ->
-                case emqx_shared_sub:get_group(Msg) of
-                    {ok, Group} ->
-                        %% Note that dispatch is called with self() in failed subs
-                        %% This is done to avoid dispatching back to caller
-                        Delivery = #delivery{sender = self(), message = Msg},
-                        emqx_shared_sub:dispatch(Group, Topic, Delivery, [self()]);
-                    _ ->
-                        false
-                end;
-            (_) ->
-                ok
-        end,
-        InflightList
-    ).
+redispatch_shared_messages(#session{inflight = Inflight, mqueue = Q}) ->
+    AllInflights = emqx_inflight:to_list(fun sort_fun/2, Inflight),
+    F = fun
+        ({_PacketId, #inflight_data{message = #message{qos = ?QOS_1} = Msg}}) ->
+            %% For QoS 2, here is what the spec says:
+            %% If the Client's Session terminates before the Client reconnects,
+            %% the Server MUST NOT send the Application Message to any other
+            %% subscribed Client [MQTT-4.8.2-5].
+            {true, Msg};
+        ({_PacketId, #inflight_data{}}) ->
+            false
+    end,
+    InflightList = lists:filtermap(F, AllInflights),
+    emqx_shared_sub:redispatch(InflightList ++ emqx_mqueue:to_list(Q)).
 
 -compile({inline, [run_hook/2]}).
 run_hook(Name, Args) ->

+ 51 - 2
apps/emqx/src/emqx_shared_sub.erl

@@ -39,7 +39,8 @@
 -export([
     dispatch/3,
     dispatch/4,
-    do_dispatch_with_ack/4
+    do_dispatch_with_ack/4,
+    redispatch/1
 ]).
 
 -export([
@@ -96,6 +97,9 @@
 -define(ACK, shared_sub_ack).
 -define(NACK(Reason), {shared_sub_nack, Reason}).
 -define(NO_ACK, no_ack).
+-define(REDISPATCH_TO(GROUP, TOPIC), {GROUP, TOPIC}).
+
+-type redispatch_to() :: ?REDISPATCH_TO(emqx_topic:group(), emqx_topic:topic()).
 
 -record(state, {pmon}).
 
@@ -144,7 +148,8 @@ dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) ->
         false ->
             {error, no_subscribers};
         {Type, SubPid} ->
-            case do_dispatch(SubPid, Group, Topic, Msg, Type) of
+            Msg1 = with_redispatch_to(Msg, Group, Topic),
+            case do_dispatch(SubPid, Group, Topic, Msg1, Type) of
                 ok ->
                     {ok, 1};
                 {error, _Reason} ->
@@ -223,6 +228,50 @@ without_group_ack(Msg) ->
 get_group_ack(Msg) ->
     emqx_message:get_header(shared_dispatch_ack, Msg, ?NO_ACK).
 
+with_redispatch_to(#message{qos = ?QOS_0} = Msg, _Group, _Topic) ->
+    Msg;
+with_redispatch_to(Msg, Group, Topic) ->
+    emqx_message:set_headers(#{redispatch_to => ?REDISPATCH_TO(Group, Topic)}, Msg).
+
+%% @hidden Redispatch is neede only for the messages with redispatch_to header added.
+is_redispatch_needed(#message{} = Msg) ->
+    case get_redispatch_to(Msg) of
+        ?REDISPATCH_TO(_, _) ->
+            true;
+        _ ->
+            false
+    end.
+
+%% @doc Redispatch shared deliveries to other members in the group.
+redispatch(Messages0) ->
+    Messages = lists:filter(fun is_redispatch_needed/1, Messages0),
+    case length(Messages) of
+        L when L > 0 ->
+            ?SLOG(info, #{
+                msg => "redispatching_shared_subscription_message",
+                count => L
+            }),
+            lists:foreach(fun redispatch_shared_message/1, Messages);
+        _ ->
+            ok
+    end.
+
+redispatch_shared_message(#message{} = Msg) ->
+    %% As long as it's still a #message{} record in inflight,
+    %% we should try to re-dispatch
+    ?REDISPATCH_TO(Group, Topic) = get_redispatch_to(Msg),
+    %% Note that dispatch is called with self() in failed subs
+    %% This is done to avoid dispatching back to caller
+    Delivery = #delivery{sender = self(), message = Msg},
+    dispatch(Group, Topic, Delivery, [self()]).
+
+%% @hidden Return the `redispatch_to` group-topic in the message header.
+%% `false` is returned if the message is not a shared dispatch.
+%% or when it's a QoS 0 message.
+-spec get_redispatch_to(emqx_types:message()) -> redispatch_to() | false.
+get_redispatch_to(Msg) ->
+    emqx_message:get_header(redispatch_to, Msg, false).
+
 -spec is_ack_required(emqx_types:message()) -> boolean().
 is_ack_required(Msg) -> ?NO_ACK =/= get_group_ack(Msg).
 

+ 184 - 63
apps/emqx/test/emqx_shared_sub_SUITE.erl

@@ -25,10 +25,20 @@
 
 -define(SUITE, ?MODULE).
 
--define(wait(For, Timeout),
-    emqx_common_test_helpers:wait_for(
-        ?FUNCTION_NAME, ?LINE, fun() -> For end, Timeout
-    )
+-define(WAIT(TIMEOUT, PATTERN, Res),
+    (fun() ->
+        receive
+            PATTERN ->
+                Res;
+            Other ->
+                ct:fail(#{
+                    expected => ??PATTERN,
+                    got => Other
+                })
+        after TIMEOUT ->
+            ct:fail({timeout, ??PATTERN})
+        end
+    end)()
 ).
 
 -define(ack, shared_sub_ack).
@@ -45,10 +55,26 @@ init_per_suite(Config) ->
 end_per_suite(_Config) ->
     emqx_common_test_helpers:stop_apps([]).
 
-t_is_ack_required(_) ->
+init_per_testcase(Case, Config) ->
+    try
+        ?MODULE:Case({'init', Config})
+    catch
+        error:function_clause ->
+            Config
+    end.
+
+end_per_testcase(Case, Config) ->
+    try
+        ?MODULE:Case({'end', Config})
+    catch
+        error:function_clause ->
+            ok
+    end.
+
+t_is_ack_required(Config) when is_list(Config) ->
     ?assertEqual(false, emqx_shared_sub:is_ack_required(#message{headers = #{}})).
 
-t_maybe_nack_dropped(_) ->
+t_maybe_nack_dropped(Config) when is_list(Config) ->
     ?assertEqual(false, emqx_shared_sub:maybe_nack_dropped(#message{headers = #{}})),
     Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}},
     ?assertEqual(true, emqx_shared_sub:maybe_nack_dropped(Msg)),
@@ -60,7 +86,7 @@ t_maybe_nack_dropped(_) ->
         end
     ).
 
-t_nack_no_connection(_) ->
+t_nack_no_connection(Config) when is_list(Config) ->
     Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}},
     ?assertEqual(ok, emqx_shared_sub:nack_no_connection(Msg)),
     ?assertEqual(
@@ -71,7 +97,7 @@ t_nack_no_connection(_) ->
         end
     ).
 
-t_maybe_ack(_) ->
+t_maybe_ack(Config) when is_list(Config) ->
     ?assertEqual(#message{headers = #{}}, emqx_shared_sub:maybe_ack(#message{headers = #{}})),
     Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}},
     ?assertEqual(
@@ -86,10 +112,7 @@ t_maybe_ack(_) ->
         end
     ).
 
-% t_subscribers(_) ->
-%     error('TODO').
-
-t_random_basic(_) ->
+t_random_basic(Config) when is_list(Config) ->
     ok = ensure_config(random),
     ClientId = <<"ClientId">>,
     Topic = <<"foo">>,
@@ -121,7 +144,7 @@ t_random_basic(_) ->
 %% After the connection for the 2nd session is also closed,
 %% i.e. when all clients are offline, the following message(s)
 %% should be delivered randomly.
-t_no_connection_nack(_) ->
+t_no_connection_nack(Config) when is_list(Config) ->
     ok = ensure_config(sticky),
     Publisher = <<"publisher">>,
     Subscriber1 = <<"Subscriber1">>,
@@ -153,54 +176,22 @@ t_no_connection_nack(_) ->
     %% This is the connection which was picked by broker to dispatch (sticky) for 1st message
 
     ?assertMatch([#{packet_id := 1}], recv_msgs(1)),
-    %% Now kill the connection, expect all following messages to be delivered to the other
-    %% subscriber.
-    %emqx_mock_client:stop(ConnPid),
-    %% sleep then make synced calls to session processes to ensure that
-    %% the connection pid's 'EXIT' message is propagated to the session process
-    %% also to be sure sessions are still alive
-    %   timer:sleep(2),
-    %   _ = emqx_session:info(SPid1),
-    %   _ = emqx_session:info(SPid2),
-    %   %% Now we know what is the other still alive connection
-    %   [TheOtherConnPid] = [SubConnPid1, SubConnPid2] -- [ConnPid],
-    %   %% Send some more messages
-    %   PacketIdList = lists:seq(2, 10),
-    %   lists:foreach(fun(Id) ->
-    %                         SendF(Id),
-    %                         ?wait(Received(Id, TheOtherConnPid), 1000)
-    %                 end, PacketIdList),
-    %   %% Now close the 2nd (last connection)
-    %   emqx_mock_client:stop(TheOtherConnPid),
-    %   timer:sleep(2),
-    %   %% both sessions should have conn_pid = undefined
-    %   ?assertEqual({conn_pid, undefined}, lists:keyfind(conn_pid, 1, emqx_session:info(SPid1))),
-    %   ?assertEqual({conn_pid, undefined}, lists:keyfind(conn_pid, 1, emqx_session:info(SPid2))),
-    %   %% send more messages, but all should be queued in session state
-    %   lists:foreach(fun(Id) -> SendF(Id) end, PacketIdList),
-    %   {_, L1} = lists:keyfind(mqueue_len, 1, emqx_session:info(SPid1)),
-    %   {_, L2} = lists:keyfind(mqueue_len, 1, emqx_session:info(SPid2)),
-    %   ?assertEqual(length(PacketIdList), L1 + L2),
-    %   %% clean up
-    %   emqx_mock_client:close_session(PubConnPid),
-    %   emqx_sm:close_session(SPid1),
-    %   emqx_sm:close_session(SPid2),
     ok.
 
-t_random(_) ->
+t_random(Config) when is_list(Config) ->
     ok = ensure_config(random, true),
     test_two_messages(random).
 
-t_round_robin(_) ->
+t_round_robin(Config) when is_list(Config) ->
     ok = ensure_config(round_robin, true),
     test_two_messages(round_robin).
 
-t_round_robin_per_group(_) ->
+t_round_robin_per_group(Config) when is_list(Config) ->
     ok = ensure_config(round_robin_per_group, true),
     test_two_messages(round_robin_per_group).
 
 %% this would fail if executed with the standard round_robin strategy
-t_round_robin_per_group_even_distribution_one_group(_) ->
+t_round_robin_per_group_even_distribution_one_group(Config) when is_list(Config) ->
     ok = ensure_config(round_robin_per_group, true),
     Topic = <<"foo/bar">>,
     Group = <<"group1">>,
@@ -264,7 +255,7 @@ t_round_robin_per_group_even_distribution_one_group(_) ->
     ),
     ok.
 
-t_round_robin_per_group_even_distribution_two_groups(_) ->
+t_round_robin_per_group_even_distribution_two_groups(Config) when is_list(Config) ->
     ok = ensure_config(round_robin_per_group, true),
     Topic = <<"foo/bar">>,
     {ok, ConnPid1} = emqtt:start_link([{clientid, <<"C0">>}]),
@@ -350,19 +341,19 @@ t_round_robin_per_group_even_distribution_two_groups(_) ->
     ),
     ok.
 
-t_sticky(_) ->
+t_sticky(Config) when is_list(Config) ->
     ok = ensure_config(sticky, true),
     test_two_messages(sticky).
 
-t_hash(_) ->
+t_hash(Config) when is_list(Config) ->
     ok = ensure_config(hash, false),
     test_two_messages(hash).
 
-t_hash_clinetid(_) ->
+t_hash_clinetid(Config) when is_list(Config) ->
     ok = ensure_config(hash_clientid, false),
     test_two_messages(hash_clientid).
 
-t_hash_topic(_) ->
+t_hash_topic(Config) when is_list(Config) ->
     ok = ensure_config(hash_topic, false),
     ClientId1 = <<"ClientId1">>,
     ClientId2 = <<"ClientId2">>,
@@ -407,7 +398,7 @@ t_hash_topic(_) ->
     ok.
 
 %% if the original subscriber dies, change to another one alive
-t_not_so_sticky(_) ->
+t_not_so_sticky(Config) when is_list(Config) ->
     ok = ensure_config(sticky),
     ClientId1 = <<"ClientId1">>,
     ClientId2 = <<"ClientId2">>,
@@ -481,7 +472,7 @@ last_message(ExpectedPayload, Pids, Timeout) ->
         <<"not yet?">>
     end.
 
-t_dispatch(_) ->
+t_dispatch(Config) when is_list(Config) ->
     ok = ensure_config(random),
     Topic = <<"foo">>,
     ?assertEqual(
@@ -494,13 +485,13 @@ t_dispatch(_) ->
         emqx_shared_sub:dispatch(<<"group1">>, Topic, #delivery{message = #message{}})
     ).
 
-t_uncovered_func(_) ->
+t_uncovered_func(Config) when is_list(Config) ->
     ignored = gen_server:call(emqx_shared_sub, ignored),
     ok = gen_server:cast(emqx_shared_sub, ignored),
     ignored = emqx_shared_sub ! ignored,
     {mnesia_table_event, []} = emqx_shared_sub ! {mnesia_table_event, []}.
 
-t_per_group_config(_) ->
+t_per_group_config(Config) when is_list(Config) ->
     ok = ensure_group_config(#{
         <<"local_group">> => local,
         <<"round_robin_group">> => round_robin,
@@ -521,7 +512,7 @@ t_per_group_config(_) ->
     test_two_messages(round_robin_per_group, <<"round_robin_per_group_group">>),
     test_two_messages(round_robin_per_group, <<"round_robin_per_group_group">>).
 
-t_local(_) ->
+t_local(Config) when is_list(Config) ->
     GroupConfig = #{
         <<"local_group">> => local,
         <<"round_robin_group">> => round_robin,
@@ -567,7 +558,7 @@ t_local(_) ->
     ?assertNotEqual(UsedSubPid1, UsedSubPid2),
     ok.
 
-t_remote(_) ->
+t_remote(Config) when is_list(Config) ->
     %% This testcase verifies dispatching of shared messages to the remote nodes via backplane API.
     %%
     %% In this testcase we start two EMQX nodes: local and remote.
@@ -620,7 +611,7 @@ t_remote(_) ->
         stop_slave(Node)
     end.
 
-t_local_fallback(_) ->
+t_local_fallback(Config) when is_list(Config) ->
     ok = ensure_group_config(#{
         <<"local_group">> => local,
         <<"round_robin_group">> => round_robin,
@@ -653,9 +644,14 @@ t_local_fallback(_) ->
 
 %% This one tests that broker tries to select another shared subscriber
 %% If the first one doesn't return an ACK
-t_redispatch(_) ->
-    ok = ensure_config(sticky, true),
+t_redispatch_qos1_with_ack(Config) when is_list(Config) ->
+    test_redispatch_qos1(Config, true).
+
+t_redispatch_qos1_no_ack(Config) when is_list(Config) ->
+    test_redispatch_qos1(Config, false).
 
+test_redispatch_qos1(_Config, AckEnabled) ->
+    ok = ensure_config(sticky, AckEnabled),
     Group = <<"group1">>,
     Topic = <<"foo/bar">>,
     ClientId1 = <<"ClientId1">>,
@@ -682,10 +678,135 @@ t_redispatch(_) ->
     emqtt:stop(UsedSubPid2),
     ok.
 
+%% No ack, QoS 2 subscriptions,
+%% client1 receives one message, send pubrec, then suspend
+%% client2 acts normal (auto_ack=true)
+%% Expected behaviour:
+%% the messages sent to client1's inflight and mq are re-dispatched after client1 is down
+t_dispatch_qos2({init, Config}) when is_list(Config) ->
+    emqx_config:put_zone_conf(default, [mqtt, max_inflight], 1),
+    Config;
+t_dispatch_qos2({'end', Config}) when is_list(Config) ->
+    emqx_config:put_zone_conf(default, [mqtt, max_inflight], 0);
+t_dispatch_qos2(Config) when is_list(Config) ->
+    ok = ensure_config(round_robin, _AckEnabled = false),
+    Topic = <<"foo/bar/1">>,
+    ClientId1 = <<"ClientId1">>,
+    ClientId2 = <<"ClientId2">>,
+
+    {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {auto_ack, false}]),
+    {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {auto_ack, true}]),
+    {ok, _} = emqtt:connect(ConnPid1),
+    {ok, _} = emqtt:connect(ConnPid2),
+
+    emqtt:subscribe(ConnPid1, {<<"$share/group/foo/bar/#">>, 2}),
+    emqtt:subscribe(ConnPid2, {<<"$share/group/foo/bar/#">>, 2}),
+
+    Message1 = emqx_message:make(ClientId1, 2, Topic, <<"hello1">>),
+    Message2 = emqx_message:make(ClientId1, 2, Topic, <<"hello2">>),
+    Message3 = emqx_message:make(ClientId1, 2, Topic, <<"hello3">>),
+    Message4 = emqx_message:make(ClientId1, 2, Topic, <<"hello4">>),
+    ct:sleep(100),
+
+    ok = sys:suspend(ConnPid1),
+
+    %% One message is inflight
+    ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message1)),
+    ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message2)),
+    ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message3)),
+    ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message4)),
+
+    MsgRec1 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P1}}, P1),
+    MsgRec2 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P2}}, P2),
+    %% assert hello2 > hello1 or hello4 > hello3
+    ?assert(MsgRec2 > MsgRec1),
+
+    sys:resume(ConnPid1),
+    %% emqtt subscriber automatically sends PUBREC, but since auto_ack is set to false
+    %% so it will never send PUBCOMP, hence EMQX should not attempt to send
+    %% the 4th message yet since max_inflight is 1.
+    MsgRec3 = ?WAIT(2000, {publish, #{client_pid := ConnPid1, payload := P3}}, P3),
+    ct:sleep(100),
+    %% no message expected
+    ?assertEqual([], collect_msgs(0)),
+    %% now kill client 1
+    kill_process(ConnPid1),
+    %% client 2 should receive the message
+    MsgRec4 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P4}}, P4),
+    %% assert hello2 > hello1 or hello4 > hello3
+    ?assert(MsgRec4 > MsgRec3),
+    emqtt:stop(ConnPid2),
+    ok.
+
+t_dispatch_qos0({init, Config}) when is_list(Config) ->
+    Config;
+t_dispatch_qos0({'end', Config}) when is_list(Config) ->
+    ok;
+t_dispatch_qos0(Config) when is_list(Config) ->
+    ok = ensure_config(round_robin, _AckEnabled = false),
+    Topic = <<"foo/bar/1">>,
+    ClientId1 = <<"ClientId1">>,
+    ClientId2 = <<"ClientId2">>,
+
+    {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {auto_ack, false}]),
+    {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {auto_ack, true}]),
+    {ok, _} = emqtt:connect(ConnPid1),
+    {ok, _} = emqtt:connect(ConnPid2),
+
+    %% subscribe with QoS 0
+    emqtt:subscribe(ConnPid1, {<<"$share/group/foo/bar/#">>, 0}),
+    emqtt:subscribe(ConnPid2, {<<"$share/group/foo/bar/#">>, 0}),
+
+    %% publish with QoS 2, but should be downgraded to 0 as the subscribers
+    %% subscribe with QoS 0
+    Message1 = emqx_message:make(ClientId1, 2, Topic, <<"hello1">>),
+    Message2 = emqx_message:make(ClientId1, 2, Topic, <<"hello2">>),
+    Message3 = emqx_message:make(ClientId1, 2, Topic, <<"hello3">>),
+    Message4 = emqx_message:make(ClientId1, 2, Topic, <<"hello4">>),
+    ct:sleep(100),
+
+    ok = sys:suspend(ConnPid1),
+
+    ?assertMatch([_], emqx:publish(Message1)),
+    ?assertMatch([_], emqx:publish(Message2)),
+    ?assertMatch([_], emqx:publish(Message3)),
+    ?assertMatch([_], emqx:publish(Message4)),
+
+    MsgRec1 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P1}}, P1),
+    MsgRec2 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P2}}, P2),
+    %% assert hello2 > hello1 or hello4 > hello3
+    ?assert(MsgRec2 > MsgRec1),
+
+    kill_process(ConnPid1),
+    %% expect no redispatch
+    ?assertEqual([], collect_msgs(timer:seconds(2))),
+    emqtt:stop(ConnPid2),
+    ok.
+
 %%--------------------------------------------------------------------
 %% help functions
 %%--------------------------------------------------------------------
 
+kill_process(Pid) ->
+    _ = unlink(Pid),
+    _ = monitor(process, Pid),
+    erlang:exit(Pid, kill),
+    receive
+        {'DOWN', _, process, Pid, _} ->
+            ok
+    end.
+
+collect_msgs(Timeout) ->
+    collect_msgs([], Timeout).
+
+collect_msgs(Acc, Timeout) ->
+    receive
+        Msg ->
+            collect_msgs([Msg | Acc], Timeout)
+    after Timeout ->
+        lists:reverse(Acc)
+    end.
+
 ensure_config(Strategy) ->
     ensure_config(Strategy, _AckEnabled = true).