Browse Source

fix(sn): fix clean_session=false willmsgs not sent

Turtle 4 years ago
parent
commit
9dc1e162fa
2 changed files with 54 additions and 9 deletions
  1. 10 9
      apps/emqx_sn/src/emqx_sn_gateway.erl
  2. 44 0
      apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl

+ 10 - 9
apps/emqx_sn/src/emqx_sn_gateway.erl

@@ -610,8 +610,9 @@ handle_call(_From, Req, State = #state{channel = Channel}) ->
             stop(Reason, Reply, State#state{channel = NChannel})
     end.
 
-handle_info(Info, State = #state{channel = Channel}) ->
-   handle_return(emqx_channel:handle_info(Info, Channel), State).
+handle_info({sock_closed, Reason} = Info, State = #state{channel = Channel}) ->
+    maybe_send_will_msg(Reason, State),
+    handle_return(emqx_channel:handle_info(Info, Channel), State).
 
 handle_timeout(TRef, TMsg, State = #state{channel = Channel}) ->
     handle_return(emqx_channel:handle_timeout(TRef, TMsg, Channel), State).
@@ -782,21 +783,21 @@ stop({shutdown, Reason}, State) ->
     stop(Reason, State);
 stop(Reason, State) ->
     ?LOG(stop_log_level(Reason), "stop due to ~p", [Reason]),
-    case Reason of
-        %% FIXME: The Will-Msg should publish when a Session terminated!
-        Reason when Reason =:= normal ->
-            ok;
-        _ ->
-            do_publish_will(State)
-    end,
+    maybe_send_will_msg(Reason, State),
     {stop, {shutdown, Reason}, State}.
 
 stop({shutdown, Reason}, Reply, State) ->
     stop(Reason, Reply, State);
 stop(Reason, Reply, State) ->
     ?LOG(stop_log_level(Reason), "stop due to ~p", [Reason]),
+    maybe_send_will_msg(Reason, State),
     {stop, {shutdown, Reason}, Reply, State}.
 
+maybe_send_will_msg(normal, _State) ->
+    ok;
+maybe_send_will_msg(_Reason, State) ->
+    do_publish_will(State).
+
 stop_log_level(Reason) when ?is_non_error_reason(Reason) ->
     debug;
 stop_log_level(_) ->

+ 44 - 0
apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl

@@ -941,6 +941,41 @@ t_will_test5(_) ->
 
     gen_udp:close(Socket).
 
+t_will_case06(_) ->
+    QoS = 1,
+    Duration = 1,
+    WillMsg = <<10, 11, 12, 13, 14>>,
+    WillTopic = <<"abc">>,
+    {ok, Socket} = gen_udp:open(0, [binary]),
+    ClientId = <<"test">>,
+
+    ok = emqx_broker:subscribe(WillTopic),
+
+    send_connect_msg_with_will1(Socket, Duration, ClientId),
+    ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)),
+
+    send_willtopic_msg(Socket, WillTopic, QoS),
+    ?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)),
+
+    send_willmsg_msg(Socket, WillMsg),
+    ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
+
+    send_pingreq_msg(Socket, undefined),
+    ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)),
+
+    % wait udp client keepalive timeout
+    timer:sleep(2000),
+
+    receive
+        {deliver, WillTopic, #message{payload = WillMsg}} -> ok;
+        Msg -> ct:print("recevived --- unex: ~p", [Msg])
+    after
+        1000 -> ct:fail(wait_willmsg_timeout)
+    end,
+    send_disconnect_msg(Socket, undefined),
+
+    gen_udp:close(Socket).
+
 t_asleep_test01_timeout(_) ->
     QoS = 1,
     Duration = 1,
@@ -1564,6 +1599,15 @@ send_connect_msg_with_will(Socket, Duration, ClientId) ->
                       ?FNU:2, ProtocolId:8, Duration:16, ClientId/binary>>,
     ok = gen_udp:send(Socket, ?HOST, ?PORT, ConnectPacket).
 
+send_connect_msg_with_will1(Socket, Duration, ClientId) ->
+    Length = 10,
+    Will = 1,
+    CleanSession = 0,
+    ProtocolId = 1,
+    ConnectPacket = <<Length:8, ?SN_CONNECT:8, ?FNU:4, Will:1, CleanSession:1,
+                      ?FNU:2, ProtocolId:8, Duration:16, ClientId/binary>>,
+    ok = gen_udp:send(Socket, ?HOST, ?PORT, ConnectPacket).
+
 send_willtopic_msg(Socket, Topic, QoS) ->
     Length = 3+byte_size(Topic),
     MsgType = ?SN_WILLTOPIC,