Explorar o código

fix(channel): send will_msg if client has been kicked

JianBo He %!s(int64=4) %!d(string=hai) anos
pai
achega
df74c180b7
Modificáronse 2 ficheiros con 38 adicións e 10 borrados
  1. 17 9
      apps/emqx/src/emqx_channel.erl
  2. 21 1
      apps/emqx/test/emqx_channel_SUITE.erl

+ 17 - 9
apps/emqx/src/emqx_channel.erl

@@ -1141,10 +1141,18 @@ return_sub_unsub_ack(Packet, Channel) ->
     {reply, Reply :: term(), channel()}
     | {shutdown, Reason :: term(), Reply :: term(), channel()}
     | {shutdown, Reason :: term(), Reply :: term(), emqx_types:packet(), channel()}.
-handle_call(kick, Channel) ->
-    Channel1 = ensure_disconnected(kicked, Channel),
-    case Channel1 of
-        ?IS_MQTT_V5 ->
+handle_call(kick, Channel = #channel{
+                               conn_state = ConnState,
+                               will_msg = WillMsg,
+                               conninfo = #{proto_ver := ProtoVer}
+                              }) ->
+    (WillMsg =/= undefined) andalso publish_will_msg(WillMsg),
+    Channel1 = case ConnState of
+                   connected -> ensure_disconnected(kicked, Channel);
+                   _ -> Channel
+               end,
+    case ProtoVer == ?MQTT_PROTO_V5 andalso ConnState == connected of
+        true ->
             shutdown(kicked, ok,
                      ?DISCONNECT_PACKET(?RC_ADMINISTRATIVE_ACTION), Channel1);
         _ ->
@@ -1227,7 +1235,7 @@ handle_info(
 ->
     emqx_config:get_zone_conf(Zone, [flapping_detect, enable]) andalso
         emqx_flapping:detect(ClientInfo),
-    Channel1 = ensure_disconnected(Reason, mabye_publish_will_msg(Channel)),
+    Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(Channel)),
     case maybe_shutdown(Reason, Channel1) of
         {ok, Channel2} -> {ok, {event, disconnected}, Channel2};
         Shutdown -> Shutdown
@@ -2051,9 +2059,9 @@ ensure_disconnected(
 %%--------------------------------------------------------------------
 %% Maybe Publish will msg
 
-mabye_publish_will_msg(Channel = #channel{will_msg = undefined}) ->
+maybe_publish_will_msg(Channel = #channel{will_msg = undefined}) ->
     Channel;
-mabye_publish_will_msg(Channel = #channel{will_msg = WillMsg}) ->
+maybe_publish_will_msg(Channel = #channel{will_msg = WillMsg}) ->
     case will_delay_interval(WillMsg) of
         0 ->
             ok = publish_will_msg(WillMsg),
@@ -2063,10 +2071,10 @@ mabye_publish_will_msg(Channel = #channel{will_msg = WillMsg}) ->
     end.
 
 will_delay_interval(WillMsg) ->
-    maps:get('Will-Delay-Interval', emqx_message:get_header(properties, WillMsg), 0).
+    maps:get('Will-Delay-Interval',
+             emqx_message:get_header(properties, WillMsg, #{}), 0).
 
 publish_will_msg(Msg) ->
-    %% TODO check if we should discard result here
     _ = emqx_broker:publish(Msg),
     ok.
 

+ 21 - 1
apps/emqx/test/emqx_channel_SUITE.erl

@@ -912,7 +912,27 @@ t_handle_call_kick(_) ->
     {shutdown, kicked, ok, _} = emqx_channel:handle_call(kick, Channelv4),
     {shutdown, kicked, ok,
      ?DISCONNECT_PACKET(?RC_ADMINISTRATIVE_ACTION),
-     _} = emqx_channel:handle_call(kick, Channelv5).
+     _} = emqx_channel:handle_call(kick, Channelv5),
+
+    DisconnectedChannelv5 = channel(#{conn_state => disconnected}),
+    DisconnectedChannelv4 = v4(DisconnectedChannelv5),
+
+    {shutdown, kicked, ok, _} = emqx_channel:handle_call(kick, DisconnectedChannelv5),
+    {shutdown, kicked, ok, _} = emqx_channel:handle_call(kick, DisconnectedChannelv4).
+
+t_handle_kicked_publish_will_msg(_) ->
+    Self = self(),
+    ok = meck:expect(emqx_broker, publish, fun(M) -> Self ! {pub, M} end),
+
+    Msg = emqx_message:make(test, <<"will_topic">>, <<"will_payload">>),
+
+    {shutdown, kicked, ok,
+     ?DISCONNECT_PACKET(?RC_ADMINISTRATIVE_ACTION),
+     _} = emqx_channel:handle_call(kick, channel(#{will_msg => Msg})),
+    receive
+        {pub, Msg} -> ok
+    after 200 -> ?assert(true)
+    end.
 
 t_handle_call_discard(_) ->
     Packet = ?DISCONNECT_PACKET(?RC_SESSION_TAKEN_OVER),