Просмотр исходного кода

Send DISCONNECT packet for mqttv5 (#3183) (#3208)

JianBo He 6 лет назад
Родитель
Сommit
e17241884c
2 измененных файлов с 21 добавлено и 9 удалено
  1. 20 8
      src/emqx_channel.erl
  2. 1 1
      test/emqx_channel_SUITE.erl

+ 20 - 8
src/emqx_channel.erl

@@ -700,14 +700,10 @@ return_unsuback(Packet, Channel) ->
        | {shutdown, Reason :: term(), Reply :: term(), channel()}).
        | {shutdown, Reason :: term(), Reply :: term(), channel()}).
 handle_call(kick, Channel) ->
 handle_call(kick, Channel) ->
     Channel1 = ensure_disconnected(kicked, Channel),
     Channel1 = ensure_disconnected(kicked, Channel),
-    shutdown(kicked, ok, Channel1);
+    disconnect_and_shutdown(kicked, ok, Channel1);
 
 
-handle_call(discard, Channel = #channel{conn_state = connected}) ->
-    Packet = ?DISCONNECT_PACKET(?RC_SESSION_TAKEN_OVER),
-    {shutdown, discarded, ok, Packet, Channel};
-
-handle_call(discard, Channel = #channel{conn_state = disconnected}) ->
-    shutdown(discarded, ok, Channel);
+handle_call(discard, Channel) ->
+    disconnect_and_shutdown(discarded, ok, Channel);
 
 
 %% Session Takeover
 %% Session Takeover
 handle_call({takeover, 'begin'}, Channel = #channel{session = Session}) ->
 handle_call({takeover, 'begin'}, Channel = #channel{session = Session}) ->
@@ -719,7 +715,7 @@ handle_call({takeover, 'end'}, Channel = #channel{session  = Session,
     %% TODO: Should not drain deliver here (side effect)
     %% TODO: Should not drain deliver here (side effect)
     Delivers = emqx_misc:drain_deliver(),
     Delivers = emqx_misc:drain_deliver(),
     AllPendings = lists:append(Delivers, Pendings),
     AllPendings = lists:append(Delivers, Pendings),
-    shutdown(takeovered, AllPendings, Channel);
+    disconnect_and_shutdown(takeovered, AllPendings, Channel);
 
 
 handle_call(list_acl_cache, Channel) ->
 handle_call(list_acl_cache, Channel) ->
     {reply, emqx_acl_cache:list_acl_cache(), Channel};
     {reply, emqx_acl_cache:list_acl_cache(), Channel};
@@ -1293,6 +1289,10 @@ publish_will_msg(Msg) -> emqx_broker:publish(Msg).
 disconnect_reason(?RC_SUCCESS) -> normal;
 disconnect_reason(?RC_SUCCESS) -> normal;
 disconnect_reason(ReasonCode)  -> emqx_reason_codes:name(ReasonCode).
 disconnect_reason(ReasonCode)  -> emqx_reason_codes:name(ReasonCode).
 
 
+reason_code(takeovered) -> ?RC_SESSION_TAKEN_OVER;
+reason_code(discarded) -> ?RC_SESSION_TAKEN_OVER;
+reason_code(_) -> ?RC_NORMAL_DISCONNECTION.
+
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Helper functions
 %% Helper functions
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
@@ -1330,6 +1330,18 @@ shutdown(success, Reply, Channel) ->
 shutdown(Reason, Reply, Channel) ->
 shutdown(Reason, Reply, Channel) ->
     {shutdown, Reason, Reply, Channel}.
     {shutdown, Reason, Reply, Channel}.
 
 
+shutdown(success, Reply, Packet, Channel) ->
+    shutdown(normal, Reply, Packet, Channel);
+shutdown(Reason, Reply, Packet, Channel) ->
+    {shutdown, Reason, Reply, Packet, Channel}.
+
+disconnect_and_shutdown(Reason, Reply, Channel = #channel{conn_state = connected,
+                                                          conninfo = #{proto_ver := ?MQTT_PROTO_V5}}) ->
+    shutdown(Reason, Reply, ?DISCONNECT_PACKET(reason_code(Reason)), Channel);
+
+disconnect_and_shutdown(Reason, Reply, Channel) ->
+    shutdown(Reason, Reply, Channel).
+
 sp(true)  -> 1;
 sp(true)  -> 1;
 sp(false) -> 0.
 sp(false) -> 0.
 
 

+ 1 - 1
test/emqx_channel_SUITE.erl

@@ -407,7 +407,7 @@ t_handle_call_takeover_begin(_) ->
 
 
 t_handle_call_takeover_end(_) ->
 t_handle_call_takeover_end(_) ->
     ok = meck:expect(emqx_session, takeover, fun(_) -> ok end),
     ok = meck:expect(emqx_session, takeover, fun(_) -> ok end),
-    {shutdown, takeovered, [], _Chan} =
+    {shutdown, takeovered, [], _, _Chan} =
         emqx_channel:handle_call({takeover, 'end'}, channel()).
         emqx_channel:handle_call({takeover, 'end'}, channel()).
 
 
 t_handle_call_unexpected(_) ->
 t_handle_call_unexpected(_) ->