فهرست منبع

fix: willmsg not published in takeover

William Yang 2 سال پیش
والد
کامیت
07eec31a8a

+ 6 - 16
apps/emqx/src/emqx_channel.erl

@@ -1134,16 +1134,14 @@ handle_call(
     kick,
     Channel = #channel{
         conn_state = ConnState,
-        will_msg = WillMsg,
-        clientinfo = ClientInfo,
         conninfo = #{proto_ver := ProtoVer}
     }
 ) ->
-    (WillMsg =/= undefined) andalso publish_will_msg(ClientInfo, WillMsg),
+    Channel0 = maybe_publish_will_msg(Channel),
     Channel1 =
         case ConnState of
-            connected -> ensure_disconnected(kicked, Channel);
-            _ -> Channel
+            connected -> ensure_disconnected(kicked, Channel0);
+            _ -> Channel0
         end,
     case ProtoVer == ?MQTT_PROTO_V5 andalso ConnState == connected of
         true ->
@@ -1426,17 +1424,9 @@ terminate(normal, Channel) ->
     run_terminate_hook(normal, Channel);
 terminate({shutdown, kicked}, Channel) ->
     run_terminate_hook(kicked, Channel);
-terminate({shutdown, Reason}, Channel) when
-    Reason =:= discarded;
-    Reason =:= takenover
-->
-    run_terminate_hook(Reason, Channel);
-terminate(Reason, Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg}) ->
-    %% since will_msg is set to undefined as soon as it is published,
-    %% if will_msg still exists when the session is terminated, it
-    %% must be published immediately.
-    WillMsg =/= undefined andalso publish_will_msg(ClientInfo, WillMsg),
-    run_terminate_hook(Reason, Channel).
+terminate(Reason, Channel) ->
+    Channel1 = maybe_publish_will_msg(Channel),
+    run_terminate_hook(Reason, Channel1).
 
 run_terminate_hook(_Reason, #channel{session = undefined}) ->
     ok;

+ 8 - 1
apps/emqx/src/emqx_connection.erl

@@ -728,7 +728,8 @@ handle_call(_From, Req, State = #state{channel = Channel}) ->
         {shutdown, Reason, Reply, OutPacket, NChannel} ->
             NState = State#state{channel = NChannel},
             ok = handle_outgoing(OutPacket, NState),
-            shutdown(Reason, Reply, NState)
+            NState2 = graceful_shutdown_transport(Reason, NState),
+            shutdown(Reason, Reply, NState2)
     end.
 
 %%--------------------------------------------------------------------
@@ -1234,6 +1235,12 @@ set_tcp_keepalive({Type, Id}) ->
             async_set_keepalive(Idle, Interval, Probes)
     end.
 
+-spec graceful_shutdown_transport(atom(), state()) -> state().
+graceful_shutdown_transport(_Reason, S = #state{transport = Transport, socket = Socket}) ->
+    %% @TODO Reason is reserved for future use, quic transport
+    Transport:shutdown(Socket, read_write),
+    S#state{sockstate = closed}.
+
 %%--------------------------------------------------------------------
 %% For CT tests
 %%--------------------------------------------------------------------

+ 4 - 0
apps/emqx/src/emqx_quic_stream.erl

@@ -32,6 +32,7 @@
     wait/1,
     getstat/2,
     fast_close/1,
+    shutdown/2,
     ensure_ok_or_exit/2,
     async_send/3,
     setopts/2,
@@ -147,6 +148,9 @@ fast_close({quic, _Conn, Stream, _Info}) ->
     % quicer:async_shutdown_connection(Conn, ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0),
     ok.
 
+shutdown({quic, _Conn, Stream, _Info}, read_write) ->
+    quicer:async_shutdown_stream(Stream).
+
 -spec ensure_ok_or_exit(atom(), list(term())) -> term().
 ensure_ok_or_exit(Fun, Args = [Sock | _]) when is_atom(Fun), is_list(Args) ->
     case erlang:apply(?MODULE, Fun, Args) of

+ 1 - 0
apps/emqx/test/emqx_connection_SUITE.erl

@@ -32,6 +32,7 @@ all() -> emqx_common_test_helpers:all(?MODULE).
 init_per_suite(Config) ->
     %% Meck Transport
     ok = meck:new(emqx_transport, [non_strict, passthrough, no_history, no_link]),
+    ok = meck:expect(emqx_transport, shutdown, fun(_, _) -> ok end),
     %% Meck Channel
     ok = meck:new(emqx_channel, [passthrough, no_history, no_link]),
     %% Meck Cm

+ 2 - 0
apps/emqx/test/emqx_shared_sub_SUITE.erl

@@ -908,6 +908,8 @@ t_session_takeover(Config) when is_list(Config) ->
     ?assertMatch([_], emqx:publish(Message3)),
     ?assertMatch([_], emqx:publish(Message4)),
     {true, _} = last_message(<<"hello2">>, [ConnPid2]),
+    %% We may or may not recv dup hello2 due to QoS1 redelivery
+    _ = last_message(<<"hello2">>, [ConnPid2]),
     {true, _} = last_message(<<"hello3">>, [ConnPid2]),
     {true, _} = last_message(<<"hello4">>, [ConnPid2]),
     ?assertEqual([], collect_msgs(timer:seconds(2))),

+ 185 - 29
apps/emqx/test/emqx_takeover_SUITE.erl

@@ -24,14 +24,17 @@
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
 
--define(TOPIC, <<"t">>).
 -define(CNT, 100).
 -define(SLEEP, 10).
 
 %%--------------------------------------------------------------------
 %% Initial funcs
 
-all() -> emqx_common_test_helpers:all(?MODULE).
+all() ->
+    [
+        {group, mqttv3},
+        {group, mqttv5}
+    ].
 
 init_per_suite(Config) ->
     Apps = emqx_cth_suite:start(
@@ -44,27 +47,39 @@ end_per_suite(Config) ->
     Apps = ?config(apps, Config),
     ok = emqx_cth_suite:stop(Apps),
     ok.
+
+groups() ->
+    [
+        {mqttv3, [], emqx_common_test_helpers:all(?MODULE)},
+        {mqttv5, [], emqx_common_test_helpers:all(?MODULE)}
+    ].
+
+init_per_group(mqttv3, Config) ->
+    lists:keystore(mqtt_vsn, 1, Config, {mqtt_vsn, v3});
+init_per_group(mqttv5, Config) ->
+    lists:keystore(mqtt_vsn, 1, Config, {mqtt_vsn, v5}).
+
+end_per_group(_Group, _Config) ->
+    ok.
+
 %%--------------------------------------------------------------------
 %% Testcases
 
-t_takeover(_) ->
+t_takeover(Config) ->
     process_flag(trap_exit, true),
-    ClientId = <<"clientid">>,
+    ClientId = atom_to_binary(?FUNCTION_NAME),
+    ClientOpts = [
+        {proto_ver, ?config(mqtt_vsn, Config)},
+        {clean_start, false}
+    ],
     Middle = ?CNT div 2,
-    Client1Msgs = messages(0, Middle),
-    Client2Msgs = messages(Middle, ?CNT div 2),
+    Client1Msgs = messages(ClientId, 0, Middle),
+    Client2Msgs = messages(ClientId, Middle, ?CNT div 2),
     AllMsgs = Client1Msgs ++ Client2Msgs,
-
-    meck:new(emqx_cm, [non_strict, passthrough]),
-    meck:expect(emqx_cm, takeover_session_end, fun(Arg) ->
-        ok = timer:sleep(?SLEEP * 2),
-        meck:passthrough([Arg])
-    end),
-
     Commands =
-        [{fun start_client/4, [ClientId, <<"t">>, ?QOS_1]}] ++
+        [{fun start_client/5, [ClientId, ClientId, ?QOS_1, ClientOpts]}] ++
             [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++
-            [{fun start_client/4, [ClientId, <<"t">>, ?QOS_1]}] ++
+            [{fun start_client/5, [ClientId, ClientId, ?QOS_1, ClientOpts]}] ++
             [{fun publish_msg/2, [Msg]} || Msg <- Client2Msgs] ++
             [{fun stop_client/1, []}],
 
@@ -78,30 +93,144 @@ t_takeover(_) ->
     ),
 
     #{client := [CPid2, CPid1]} = FCtx,
-    ?assertReceive({'EXIT', CPid1, {disconnected, ?RC_SESSION_TAKEN_OVER, _}}),
-    ?assertReceive({'EXIT', CPid2, normal}),
 
+    assert_client_exit(CPid1, ?config(mqtt_vsn, Config)),
+    ?assertReceive({'EXIT', CPid2, normal}),
     Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)],
     ct:pal("middle: ~p", [Middle]),
     ct:pal("received: ~p", [[P || #{payload := P} <- Received]]),
     assert_messages_missed(AllMsgs, Received),
     assert_messages_order(AllMsgs, Received),
+    ok.
+
+t_takeover_willmsg(Config) ->
+    process_flag(trap_exit, true),
+    ClientId = atom_to_binary(?FUNCTION_NAME),
+    WillTopic = <<ClientId/binary, <<"willtopic">>/binary>>,
+    Middle = ?CNT div 2,
+    Client1Msgs = messages(ClientId, 0, Middle),
+    Client2Msgs = messages(ClientId, Middle, ?CNT div 2),
+    AllMsgs = Client1Msgs ++ Client2Msgs,
+    WillOpts = [
+        {proto_ver, ?config(mqtt_vsn, Config)},
+        {clean_start, false},
+        {will_topic, WillTopic},
+        {will_payload, <<"willpayload">>},
+        {will_qos, 0}
+    ],
+    Commands =
+        %% GIVEN client connect with will message
+        [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++
+            [
+                {fun start_client/5, [
+                    <<ClientId/binary, <<"_willsub">>/binary>>, WillTopic, ?QOS_1, []
+                ]}
+            ] ++
+            [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++
+            %% WHEN client reconnect with clean_start = false
+            [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++
+            [{fun publish_msg/2, [Msg]} || Msg <- Client2Msgs],
+
+    FCtx = lists:foldl(
+        fun({Fun, Args}, Ctx) ->
+            ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]),
+            apply(Fun, [Ctx | Args])
+        end,
+        #{},
+        Commands
+    ),
+
+    #{client := [CPid2, CPidSub, CPid1]} = FCtx,
+    assert_client_exit(CPid1, ?config(mqtt_vsn, Config)),
+    Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)],
+    ct:pal("received: ~p", [[P || #{payload := P} <- Received]]),
+    {IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload">>),
+    assert_messages_missed(AllMsgs, ReceivedNoWill),
+    assert_messages_order(AllMsgs, ReceivedNoWill),
+    %% THEN will message should be received
+    ?assert(IsWill),
+    emqtt:stop(CPidSub),
+    emqtt:stop(CPid2),
+    ?assertReceive({'EXIT', CPid2, normal}),
+    ?assert(not is_process_alive(CPid1)),
+    ok.
+
+t_takeover_willmsg_clean_session(Config) ->
+    process_flag(trap_exit, true),
+    ClientId = atom_to_binary(?FUNCTION_NAME),
+    WillTopic = <<ClientId/binary, <<"willtopic">>/binary>>,
+    Middle = ?CNT div 2,
+    Client1Msgs = messages(ClientId, 0, Middle),
+    Client2Msgs = messages(ClientId, Middle, ?CNT div 2),
+    AllMsgs = Client1Msgs ++ Client2Msgs,
+    WillOpts = [
+        {proto_ver, ?config(mqtt_vsn, Config)},
+        {clean_start, false},
+        {will_topic, WillTopic},
+        {will_payload, <<"willpayload_1">>},
+        {will_qos, 1}
+    ],
+    WillOptsClean = [
+        {proto_ver, ?config(mqtt_vsn, Config)},
+        {clean_start, true},
+        {will_topic, WillTopic},
+        {will_payload, <<"willpayload_2">>},
+        {will_qos, 1}
+    ],
 
-    meck:unload(emqx_cm),
+    Commands =
+        %% GIVEN: client connect with willmsg payload <<"willpayload_1">>
+        [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++
+            [
+                {fun start_client/5, [
+                    <<ClientId/binary, <<"_willsub">>/binary>>, WillTopic, ?QOS_1, []
+                ]}
+            ] ++
+            [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++
+            %% WHEN: client connects with clean_start=true and willmsg payload <<"willpayload_2">>
+            [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOptsClean]}] ++
+            [{fun publish_msg/2, [Msg]} || Msg <- Client2Msgs] ++
+            [
+                {
+                    fun(CTX) ->
+                        timer:sleep(1000),
+                        CTX
+                    end,
+                    []
+                }
+            ],
+
+    FCtx = lists:foldl(
+        fun({Fun, Args}, Ctx) ->
+            ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]),
+            apply(Fun, [Ctx | Args])
+        end,
+        #{},
+        Commands
+    ),
+    #{client := [CPid2, CPidSub, CPid1]} = FCtx,
+    assert_client_exit(CPid1, ?config(mqtt_vsn, Config)),
+    Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)],
+    ct:pal("received: ~p", [[P || #{payload := P} <- Received]]),
+    {IsWill1, ReceivedNoWill0} = filter_payload(Received, <<"willpayload_1">>),
+    {IsWill2, ReceivedNoWill} = filter_payload(ReceivedNoWill0, <<"willpayload_2">>),
+    assert_messages_missed(AllMsgs, ReceivedNoWill),
+    assert_messages_order(AllMsgs, ReceivedNoWill),
+    %% THEN: payload <<"willpayload_1">> should be published instead of <<"willpayload_2">>
+    ?assert(IsWill1),
+    ?assertNot(IsWill2),
+    emqtt:stop(CPid2),
+    emqtt:stop(CPidSub),
+    ?assert(not is_process_alive(CPid1)),
     ok.
 
-t_takover_in_cluster(_) ->
-    todo.
+%% t_takover_in_cluster(_) ->
+%%     todo.
 
 %%--------------------------------------------------------------------
 %% Commands
-
-start_client(Ctx, ClientId, Topic, Qos) ->
-    {ok, CPid} = emqtt:start_link([
-        {clientid, ClientId},
-        {proto_ver, v5},
-        {clean_start, false}
-    ]),
+start_client(Ctx, ClientId, Topic, Qos, Opts) ->
+    {ok, CPid} = emqtt:start_link([{clientid, ClientId} | Opts]),
     _ = erlang:spawn_link(fun() ->
         {ok, _} = emqtt:connect(CPid),
         ct:pal("CLIENT: connected ~p", [CPid]),
@@ -157,8 +286,8 @@ assert_messages_order([Msg | Expected], Received) ->
             assert_messages_order(Expected, Rest)
     end.
 
-messages(Offset, Cnt) ->
-    [emqx_message:make(ct, ?QOS_1, ?TOPIC, payload(Offset + I)) || I <- lists:seq(1, Cnt)].
+messages(Topic, Offset, Cnt) ->
+    [emqx_message:make(ct, ?QOS_1, Topic, payload(Offset + I)) || I <- lists:seq(1, Cnt)].
 
 payload(I) ->
     % NOTE
@@ -170,3 +299,30 @@ payload(I) ->
             emqx_utils_calendar:now_to_rfc3339(millisecond)
         ])
     ).
+
+%% @doc Filter out the message with matching target payload from the list of messages.
+%%      return '{IsTargetFound, ListOfOtherMessages}'
+%% @end
+-spec filter_payload(List :: [#{payload := binary()}], Payload :: binary()) ->
+    {IsPayloadFound :: boolean(), OtherPayloads :: [#{payload := binary()}]}.
+filter_payload(List, Payload) when is_binary(Payload) ->
+    IsWill = lists:any(
+        fun
+            (#{payload := P}) -> Payload == P;
+            (_) -> false
+        end,
+        List
+    ),
+    Filtered = [
+        Msg
+     || #{payload := P} = Msg <- List,
+        P =/= Payload
+    ],
+    {IsWill, Filtered}.
+
+%% @doc assert emqtt *client* process exits as expected.
+assert_client_exit(Pid, v5) ->
+    %% @ref: MQTT 5.0 spec [MQTT-3.1.4-3]
+    ?assertReceive({'EXIT', Pid, {disconnected, ?RC_SESSION_TAKEN_OVER, _}});
+assert_client_exit(Pid, v3) ->
+    ?assertReceive({'EXIT', Pid, {shutdown, tcp_closed}}).

+ 2 - 0
changes/ce/fix-11868.en.md

@@ -0,0 +1,2 @@
+Fix a bug that willmsg is not published after session takeover.
+