فهرست منبع

test(shared): add a test case to ensure shared dispatch retry

to ensure retry will not enter a dead loop
Zaiming (Stone) Shi 3 سال پیش
والد
کامیت
1c29e2806a

+ 1 - 1
CHANGES-5.0.md

@@ -10,7 +10,7 @@
 * Fix GET /listeners API crash When some nodes still in initial configuration. [#9002](https://github.com/emqx/emqx/pull/9002)
 * Fix empty variable interpolation in authentication and authorization. Placeholders for undefined variables are rendered now as empty strings and do not cause errors anymore. [#8963](https://github.com/emqx/emqx/pull/8963)
 * Fix the latency statistics error of the slow subscription module when `stats_type` is `internal` or `response`. [#8986](https://github.com/emqx/emqx/pull/8986)
-* Redispatch shared subscription messages.
+* Redispatch shared subscription messages. [#9104](https://github.com/emqx/emqx/pull/9104)
 
 # 5.0.8
 

+ 12 - 2
apps/emqx/src/emqx_channel.erl

@@ -997,8 +997,13 @@ maybe_nack(Delivers) ->
     lists:filter(fun not_nacked/1, Delivers).
 
 not_nacked({deliver, _Topic, Msg}) ->
-    not (emqx_shared_sub:is_ack_required(Msg) andalso
-        (ok == emqx_shared_sub:nack_no_connection(Msg))).
+    case emqx_shared_sub:is_ack_required(Msg) of
+        true ->
+            ok = emqx_shared_sub:nack_no_connection(Msg),
+            false;
+        false ->
+            true
+    end.
 
 maybe_mark_as_delivered(Session, Delivers) ->
     case emqx_session:info(is_persistent, Session) of
@@ -1222,6 +1227,8 @@ handle_call(
     ChanInfo1 = info(NChannel),
     emqx_cm:set_chan_info(ClientId, ChanInfo1#{sockinfo => SockInfo}),
     reply(ok, reset_timer(alive_timer, NChannel));
+handle_call(get_mqueue, Channel) ->
+    reply({ok, get_mqueue(Channel)}, Channel);
 handle_call(Req, Channel) ->
     ?SLOG(error, #{msg => "unexpected_call", call => Req}),
     reply(ignored, Channel).
@@ -2224,3 +2231,6 @@ get_mqtt_conf(Zone, Key, Default) ->
 set_field(Name, Value, Channel) ->
     Pos = emqx_misc:index_of(Name, record_info(fields, channel)),
     setelement(Pos + 1, Channel, Value).
+
+get_mqueue(#channel{session = Session}) ->
+    emqx_session:get_mqueue(Session).

+ 5 - 1
apps/emqx/src/emqx_session.erl

@@ -60,7 +60,8 @@
     info/2,
     is_session/1,
     stats/1,
-    obtain_next_pkt_id/1
+    obtain_next_pkt_id/1,
+    get_mqueue/1
 ]).
 
 -export([
@@ -917,3 +918,6 @@ age(Now, Ts) -> Now - Ts.
 set_field(Name, Value, Session) ->
     Pos = emqx_misc:index_of(Name, record_info(fields, session)),
     setelement(Pos + 1, Session, Value).
+
+get_mqueue(#session{mqueue = Q}) ->
+    emqx_mqueue:to_list(Q).

+ 1 - 9
apps/emqx/src/emqx_shared_sub.erl

@@ -47,8 +47,7 @@
     maybe_ack/1,
     maybe_nack_dropped/1,
     nack_no_connection/1,
-    is_ack_required/1,
-    get_group/1
+    is_ack_required/1
 ]).
 
 %% for testing
@@ -275,13 +274,6 @@ get_redispatch_to(Msg) ->
 -spec is_ack_required(emqx_types:message()) -> boolean().
 is_ack_required(Msg) -> ?NO_ACK =/= get_group_ack(Msg).
 
--spec get_group(emqx_types:message()) -> {ok, any()} | error.
-get_group(Msg) ->
-    case get_group_ack(Msg) of
-        ?NO_ACK -> error;
-        {Group, _Sender, _Ref} -> {ok, Group}
-    end.
-
 %% @doc Negative ack dropped message due to inflight window or message queue being full.
 -spec maybe_nack_dropped(emqx_types:message()) -> boolean().
 maybe_nack_dropped(Msg) ->

+ 34 - 0
apps/emqx/test/emqx_shared_sub_SUITE.erl

@@ -678,6 +678,40 @@ test_redispatch_qos1(_Config, AckEnabled) ->
     emqtt:stop(UsedSubPid2),
     ok.
 
+t_qos1_random_dispatch_if_all_members_are_down(Config) when is_list(Config) ->
+    ok = ensure_config(sticky, true),
+    Group = <<"group1">>,
+    Topic = <<"foo/bar">>,
+    ClientId1 = <<"ClientId1">>,
+    ClientId2 = <<"ClientId2">>,
+    SubOpts = [{clean_start, false}],
+    {ok, ConnPub} = emqtt:start_link([{clientid, <<"pub">>}]),
+    {ok, _} = emqtt:connect(ConnPub),
+
+    {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1} | SubOpts]),
+    {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2} | SubOpts]),
+    {ok, _} = emqtt:connect(ConnPid1),
+    {ok, _} = emqtt:connect(ConnPid2),
+
+    emqtt:subscribe(ConnPid1, {<<"$share/", Group/binary, "/foo/bar">>, 1}),
+    emqtt:subscribe(ConnPid2, {<<"$share/", Group/binary, "/foo/bar">>, 1}),
+
+    ok = emqtt:stop(ConnPid1),
+    ok = emqtt:stop(ConnPid2),
+
+    [Pid1, Pid2] = emqx_shared_sub:subscribers(Group, Topic),
+    ?assert(is_process_alive(Pid1)),
+    ?assert(is_process_alive(Pid2)),
+
+    {ok, _} = emqtt:publish(ConnPub, Topic, <<"hello11">>, 1),
+    ct:sleep(100),
+    {ok, Msgs1} = gen_server:call(Pid1, get_mqueue),
+    {ok, Msgs2} = gen_server:call(Pid2, get_mqueue),
+    %% assert the message is in mqueue (because socket is closed)
+    ?assertMatch([#message{payload = <<"hello11">>}], Msgs1 ++ Msgs2),
+    emqtt:stop(ConnPub),
+    ok.
+
 %% No ack, QoS 2 subscriptions,
 %% client1 receives one message, send pubrec, then suspend
 %% client2 acts normal (auto_ack=true)