Procházet zdrojové kódy

Merge pull request #2837 from emqx/fix_sock_closed

Fix a bug that session terminates immediately when received DISCONNECT packet
tigercl před 6 roky
rodič
revize
f26827a0b9
1 změnil soubory, kde provedl 17 přidání a 12 odebrání
  1. 17 12
      src/emqx_channel.erl

+ 17 - 12
src/emqx_channel.erl

@@ -73,6 +73,8 @@
           oom_policy :: emqx_oom:oom_policy(),
           %% Connected
           connected :: boolean(),
+          %% Disonnected
+          disconnected :: boolean(),
           %% Connected at
           connected_at :: erlang:timestamp(),
           disconnected_at :: erlang:timestamp(),
@@ -123,16 +125,17 @@ init(ConnInfo, Options) ->
                  end,
     GcState = emqx_gc:init(emqx_zone:get_env(Zone, force_gc_policy, false)),
     OomPolicy = emqx_oom:init(emqx_zone:get_env(Zone, force_shutdown_policy)),
-    #channel{client     = Client,
-             session    = undefined,
-             protocol   = undefined,
-             gc_state   = GcState,
-             oom_policy = OomPolicy,
-             timers     = #{stats_timer => StatsTimer},
-             connected  = false,
-             takeover   = false,
-             resuming   = false,
-             pendings   = []
+    #channel{client       = Client,
+             session      = undefined,
+             protocol     = undefined,
+             gc_state     = GcState,
+             oom_policy   = OomPolicy,
+             timers       = #{stats_timer => StatsTimer},
+             connected    = false,
+             disconnected = false,
+             takeover     = false,
+             resuming     = false,
+             pendings     = []
             }.
 
 peer_cert_as_username(Options) ->
@@ -634,6 +637,8 @@ handle_info({unsubscribe, TopicFilters}, Channel = #channel{client = Client}) ->
     {_ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Channel),
     {ok, NChannel};
 
+handle_info(sock_closed, Channel = #channel{disconnected = true}) ->
+    {ok, Channel};
 handle_info(sock_closed, Channel = #channel{connected = false}) ->
     shutdown(closed, Channel);
 handle_info(sock_closed, Channel = #channel{protocol = Protocol,
@@ -1111,10 +1116,10 @@ enrich_assigned_clientid(AckProps, #channel{client = #{client_id := ClientId},
     end.
 
 ensure_connected(Channel) ->
-    Channel#channel{connected = true, connected_at = os:timestamp()}.
+    Channel#channel{connected = true, connected_at = os:timestamp(), disconnected = false}.
 
 ensure_disconnected(Channel) ->
-    Channel#channel{connected = false, disconnected_at = os:timestamp()}.
+    Channel#channel{connected = false, disconnected_at = os:timestamp(), disconnected = true}.
 
 ensure_keepalive(#{'Server-Keep-Alive' := Interval}, Channel) ->
     ensure_keepalive_timer(Interval, Channel);