Bladeren bron

Merge branch 'new-connection-module' into develop

Feng Lee 6 jaren geleden
bovenliggende
commit
932821360f
8 gewijzigde bestanden met toevoegingen van 646 en 629 verwijderingen
  1. 5 0
      etc/emqx.conf
  2. 6 0
      priv/emqx.schema
  3. 120 122
      src/emqx_channel.erl
  4. 364 357
      src/emqx_connection.erl
  5. 4 1
      src/emqx_types.erl
  6. 129 138
      src/emqx_ws_connection.erl
  7. 17 11
      test/emqx_channel_SUITE.erl
  8. 1 0
      test/emqx_pool_SUITE.erl

+ 5 - 0
etc/emqx.conf

@@ -554,6 +554,11 @@ mqtt.ignore_loop_deliver = false
 ## Value: duration
 zone.external.idle_timeout = 15s
 
+## Hibernate after a duration of idle state.
+##
+## Value: duration
+zone.external.hibernate_after = 60s
+
 ## Publish limit for the external MQTT connections.
 ##
 ## Value: Number,Duration

+ 6 - 0
priv/emqx.schema

@@ -725,6 +725,12 @@ end}.
   {datatype, {duration, ms}}
 ]}.
 
+%% @doc Hibernate after a duration of idle state.
+{mapping, "zone.$name.hibernate_after", "emqx.zones", [
+  {default, "60s"},
+  {datatype, {duration, ms}}
+]}.
+
 {mapping, "zone.$name.allow_anonymous", "emqx.zones", [
   {datatype, {enum, [true, false]}}
 ]}.

+ 120 - 122
src/emqx_channel.erl

@@ -31,7 +31,7 @@
         , caps/1
         ]).
 
-%% Exports for unit tests:(
+%% Test Exports
 -export([set_field/3]).
 
 -export([ init/2
@@ -40,14 +40,9 @@
         , handle_call/2
         , handle_info/2
         , handle_timeout/3
-        , disconnect/2
         , terminate/2
         ]).
 
--export([ received/2
-        , sent/2
-        ]).
-
 -import(emqx_misc,
         [ run_fold/3
         , pipeline/3
@@ -75,8 +70,8 @@
           pub_stats :: emqx_types:stats(),
           %% Timers
           timers :: #{atom() => disabled | maybe(reference())},
-          %% Fsm State
-          state :: fsm_state(),
+          %% Conn State
+          conn_state :: conn_state(),
           %% GC State
           gc_state :: maybe(emqx_gc:gc_state()),
           %% Takeover
@@ -89,13 +84,14 @@
 
 -opaque(channel() :: #channel{}).
 
--type(fsm_state() :: #{state_name := initialized
-                                   | connecting
-                                   | connected
-                                   | disconnected,
-                       connected_at := pos_integer(),
-                       disconnected_at := pos_integer()
-                      }).
+-type(conn_state() :: idle | connecting | connected | disconnected).
+
+-type(action() :: {enter, connected | disconnected}
+                | {close, Reason :: atom()}
+                | {outgoing, emqx_types:packet()}
+                | {outgoing, [emqx_types:packet()]}).
+
+-type(output() :: emqx_types:packet() | action() | [action()]).
 
 -define(TIMER_TABLE, #{
           stats_timer  => emit_stats,
@@ -106,7 +102,7 @@
           will_timer   => will_message
          }).
 
--define(ATTR_KEYS, [conninfo, clientinfo, state, session]).
+-define(ATTR_KEYS, [conninfo, clientinfo, session, conn_state]).
 
 -define(INFO_KEYS, ?ATTR_KEYS ++ [keepalive, will_msg, topic_aliases,
                                   alias_maximum, gc_state]).
@@ -129,8 +125,8 @@ info(clientinfo, #channel{clientinfo = ClientInfo}) ->
     ClientInfo;
 info(session, #channel{session = Session}) ->
     maybe_apply(fun emqx_session:info/1, Session);
-info(state, #channel{state = State}) ->
-    State;
+info(conn_state, #channel{conn_state = ConnState}) ->
+    ConnState;
 info(keepalive, #channel{keepalive = Keepalive}) ->
     maybe_apply(fun emqx_keepalive:info/1, Keepalive);
 info(topic_aliases, #channel{topic_aliases = Aliases}) ->
@@ -204,7 +200,7 @@ init(ConnInfo = #{peername := {PeerHost, _Port}}, Options) ->
              clientinfo = ClientInfo,
              pub_stats  = #{},
              timers     = #{stats_timer => StatsTimer},
-             state      = #{state_name => initialized},
+             conn_state = idle,
              gc_state   = init_gc_state(Zone),
              takeover   = false,
              resuming   = false,
@@ -221,15 +217,16 @@ init_gc_state(Zone) ->
 %% Handle incoming packet
 %%--------------------------------------------------------------------
 
--spec(handle_in(emqx_types:packet(), channel())
+-spec(handle_in(Bytes :: pos_integer() | emqx_types:packet(), channel())
       -> {ok, channel()}
-       | {ok, emqx_types:packet(), channel()}
-       | {ok, list(emqx_types:packet()), channel()}
-       | {close, channel()}
-       | {close, emqx_types:packet(), channel()}
-       | {stop, Error :: term(), channel()}
-       | {stop, Error :: term(), emqx_types:packet(), channel()}).
-handle_in(?CONNECT_PACKET(_), Channel = #channel{state = #{state_name := connected}}) ->
+       | {ok, output(), channel()}
+       | {stop, Reason :: term(), channel()}
+       | {stop, Reason :: term(), output(), channel()}).
+handle_in(Bytes, Channel) when is_integer(Bytes) ->
+    NChannel = maybe_gc_and_check_oom(Bytes, Channel),
+    {ok, ensure_timer(stats_timer, NChannel)};
+
+handle_in(?CONNECT_PACKET(_), Channel = #channel{conn_state = connected}) ->
      handle_out({disconnect, ?RC_PROTOCOL_ERROR}, Channel);
 
 handle_in(?CONNECT_PACKET(ConnPkt), Channel) ->
@@ -243,35 +240,36 @@ handle_in(?CONNECT_PACKET(ConnPkt), Channel) ->
         {ok, NConnPkt, NChannel} ->
             process_connect(NConnPkt, NChannel);
         {error, ReasonCode, NChannel} ->
-            handle_out({connack, emqx_reason_codes:formalized(connack, ReasonCode), ConnPkt}, NChannel)
+            ReasonName = emqx_reason_codes:formalized(connack, ReasonCode),
+            handle_out({connack, ReasonName, ConnPkt}, NChannel)
     end;
 
 handle_in(Packet = ?PUBLISH_PACKET(_QoS), Channel) ->
-    Channel1 = inc_pub_stats(publish_in, Channel),
+    NChannel = inc_pub_stats(publish_in, Channel),
     case emqx_packet:check(Packet) of
-        ok -> handle_publish(Packet, Channel1);
+        ok -> handle_publish(Packet, NChannel);
         {error, ReasonCode} ->
-            handle_out({disconnect, ReasonCode}, Channel1)
+            handle_out({disconnect, ReasonCode}, NChannel)
     end;
 
 handle_in(?PUBACK_PACKET(PacketId, _ReasonCode),
           Channel = #channel{clientinfo = ClientInfo, session = Session}) ->
-    Channel1 = inc_pub_stats(puback_in, Channel),
+    NChannel = inc_pub_stats(puback_in, Channel),
     case emqx_session:puback(PacketId, Session) of
         {ok, Msg, Publishes, NSession} ->
             ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]),
-            handle_out({publish, Publishes}, Channel1#channel{session = NSession});
+            handle_out({publish, Publishes}, NChannel#channel{session = NSession});
         {ok, Msg, NSession} ->
             ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]),
-            {ok, Channel1#channel{session = NSession}};
+            {ok, NChannel#channel{session = NSession}};
         {error, ?RC_PACKET_IDENTIFIER_IN_USE} ->
             ?LOG(warning, "The PUBACK PacketId ~w is inuse.", [PacketId]),
             ok = emqx_metrics:inc('packets.puback.inuse'),
-            {ok, Channel1};
+            {ok, NChannel};
         {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
             ?LOG(warning, "The PUBACK PacketId ~w is not found", [PacketId]),
             ok = emqx_metrics:inc('packets.puback.missed'),
-            {ok, Channel1}
+            {ok, NChannel}
     end;
 
 handle_in(?PUBREC_PACKET(PacketId, _ReasonCode),
@@ -360,24 +358,24 @@ handle_in(?DISCONNECT_PACKET(ReasonCode, Properties), Channel = #channel{conninf
             shutdown(ReasonName, Channel1);
         true ->
             Channel2 = Channel1#channel{conninfo = ConnInfo#{expiry_interval => Interval}},
-            {close, ReasonName, Channel2}
+            {ok, {close, ReasonName}, Channel2}
     end;
 
 handle_in(?AUTH_PACKET(), Channel) ->
     handle_out({disconnect, ?RC_IMPLEMENTATION_SPECIFIC_ERROR}, Channel);
 
-handle_in({frame_error, Reason}, Channel = #channel{state = FsmState}) ->
-    case FsmState of
-        #{state_name := initialized} ->
-            shutdown(Reason, Channel);
-        #{state_name := connecting} ->
-            shutdown(Reason, ?CONNACK_PACKET(?RC_MALFORMED_PACKET), Channel);
-        #{state_name := connected} ->
-            handle_out({disconnect, ?RC_MALFORMED_PACKET}, Channel);
-        #{state_name := disconnected} ->
-            ?LOG(error, "Unexpected frame error: ~p", [Reason]),
-            {ok, Channel}
-    end;
+handle_in({frame_error, Reason}, Channel = #channel{conn_state = idle}) ->
+    shutdown(Reason, Channel);
+
+handle_in({frame_error, Reason}, Channel = #channel{conn_state = connecting}) ->
+    shutdown(Reason, ?CONNACK_PACKET(?RC_MALFORMED_PACKET), Channel);
+
+handle_in({frame_error, _Reason}, Channel = #channel{conn_state = connected}) ->
+    handle_out({disconnect, ?RC_MALFORMED_PACKET}, Channel);
+
+handle_in({frame_error, Reason}, Channel = #channel{conn_state = disconnected}) ->
+    ?LOG(error, "Unexpected frame error: ~p", [Reason]),
+    {ok, Channel};
 
 handle_in(Packet, Channel) ->
     ?LOG(error, "Unexpected incoming: ~p", [Packet]),
@@ -528,32 +526,61 @@ do_unsubscribe(TopicFilter, _SubOpts, Channel =
 %% Handle outgoing packet
 %%--------------------------------------------------------------------
 
-%%TODO: RunFold or Pipeline
+-spec(handle_out(integer()|term(), channel())
+      -> {ok, channel()}
+       | {ok, output(), channel()}
+       | {stop, Reason :: term(), channel()}
+       | {stop, Reason :: term(), output(), channel()}).
+handle_out(Bytes, Channel) when is_integer(Bytes) ->
+    NChannel = maybe_gc_and_check_oom(Bytes, Channel),
+    {ok, ensure_timer(stats_timer, NChannel)};
+
+handle_out(Delivers, Channel = #channel{conn_state = disconnected,
+                                        session = Session})
+  when is_list(Delivers) ->
+    NSession = emqx_session:enqueue(Delivers, Session),
+    {ok, Channel#channel{session = NSession}};
+
+handle_out(Delivers, Channel = #channel{takeover = true,
+                                        pendings = Pendings})
+  when is_list(Delivers) ->
+    {ok, Channel#channel{pendings = lists:append(Pendings, Delivers)}};
+
+handle_out(Delivers, Channel = #channel{session = Session}) when is_list(Delivers) ->
+    case emqx_session:deliver(Delivers, Session) of
+        {ok, Publishes, NSession} ->
+            NChannel = Channel#channel{session = NSession},
+            handle_out({publish, Publishes}, ensure_timer(retry_timer, NChannel));
+        {ok, NSession} ->
+            {ok, Channel#channel{session = NSession}}
+    end;
+
 handle_out({connack, ?RC_SUCCESS, SP, ConnPkt},
            Channel = #channel{conninfo   = ConnInfo,
-                              clientinfo = ClientInfo,
-                              state      = FsmState}) ->
+                              clientinfo = ClientInfo}) ->
     AckProps = run_fold([fun enrich_caps/2,
                          fun enrich_server_keepalive/2,
                          fun enrich_assigned_clientid/2], #{}, Channel),
-    FsmState1 = FsmState#{state_name => connected,
-                          connected_at => erlang:system_time(second)
-                         },
-    Channel1 = Channel#channel{state = FsmState1,
+    ConnInfo1 = ConnInfo#{connected_at => erlang:system_time(second)},
+    Channel1 = Channel#channel{conninfo = ConnInfo1,
                                will_msg = emqx_packet:will_msg(ConnPkt),
+                               conn_state = connected,
                                alias_maximum = init_alias_maximum(ConnPkt, ClientInfo)
                               },
     Channel2 = ensure_keepalive(AckProps, Channel1),
     ok = emqx_hooks:run('client.connected', [ClientInfo, ?RC_SUCCESS, ConnInfo]),
     AckPacket = ?CONNACK_PACKET(?RC_SUCCESS, SP, AckProps),
     case maybe_resume_session(Channel2) of
-        ignore -> {ok, AckPacket, Channel2};
+        ignore ->
+            Output = [{outgoing, AckPacket}, {enter, connected}],
+            {ok, Output, Channel2};
         {ok, Publishes, NSession} ->
             Channel3 = Channel2#channel{session  = NSession,
                                         resuming = false,
                                         pendings = []},
-            {ok, Packets, _} = handle_out({publish, Publishes}, Channel3),
-            {ok, [AckPacket|Packets], Channel3}
+            {ok, {outgoing, Packets}, _} = handle_out({publish, Publishes}, Channel3),
+            Output = [{outgoing, [AckPacket|Packets]}, {enter, connected}],
+            {ok, Output, Channel3}
     end;
 
 handle_out({connack, ReasonCode, _ConnPkt}, Channel = #channel{conninfo = ConnInfo,
@@ -566,24 +593,6 @@ handle_out({connack, ReasonCode, _ConnPkt}, Channel = #channel{conninfo = ConnIn
     Reason = emqx_reason_codes:name(ReasonCode1, ProtoVer),
     shutdown(Reason, ?CONNACK_PACKET(ReasonCode1), Channel);
 
-handle_out({deliver, Delivers}, Channel = #channel{state   = #{state_name := disconnected},
-                                                   session = Session}) ->
-    NSession = emqx_session:enqueue(Delivers, Session),
-    {ok, Channel#channel{session = NSession}};
-
-handle_out({deliver, Delivers}, Channel = #channel{takeover = true,
-                                                   pendings = Pendings}) ->
-    {ok, Channel#channel{pendings = lists:append(Pendings, Delivers)}};
-
-handle_out({deliver, Delivers}, Channel = #channel{session = Session}) ->
-    case emqx_session:deliver(Delivers, Session) of
-        {ok, Publishes, NSession} ->
-            NChannel = Channel#channel{session = NSession},
-            handle_out({publish, Publishes}, ensure_timer(retry_timer, NChannel));
-        {ok, NSession} ->
-            {ok, Channel#channel{session = NSession}}
-    end;
-
 handle_out({publish, Publishes}, Channel) when is_list(Publishes) ->
     Packets = lists:foldl(
                 fun(Publish, Acc) ->
@@ -594,12 +603,12 @@ handle_out({publish, Publishes}, Channel) when is_list(Publishes) ->
                     end
                 end, [], Publishes),
     NChannel = inc_pub_stats(publish_out, length(Packets), Channel),
-    {ok, lists:reverse(Packets), NChannel};
+    {ok, {outgoing, lists:reverse(Packets)}, NChannel};
 
 %% Ignore loop deliver
 handle_out({publish, _PacketId, #message{from  = ClientId,
                                          flags = #{nl := true}}},
-            Channel = #channel{clientinfo = #{clientid := ClientId}}) ->
+           Channel = #channel{clientinfo = #{clientid := ClientId}}) ->
     {ok, Channel};
 
 handle_out({publish, PacketId, Msg}, Channel =
@@ -640,7 +649,6 @@ handle_out({disconnect, ReasonCode}, Channel = #channel{conninfo = #{proto_ver :
     ReasonName = emqx_reason_codes:name(ReasonCode, ProtoVer),
     handle_out({disconnect, ReasonCode, ReasonName}, Channel);
 
-%%TODO: Improve later...
 handle_out({disconnect, ReasonCode, ReasonName},
            Channel = #channel{conninfo = #{proto_ver := ProtoVer,
                                            expiry_interval := ExpiryInterval}}) ->
@@ -650,14 +658,19 @@ handle_out({disconnect, ReasonCode, ReasonName},
         {0, _Ver} ->
             shutdown(ReasonName, Channel);
         {?UINT_MAX, ?MQTT_PROTO_V5} ->
-            {close, ReasonName, ?DISCONNECT_PACKET(ReasonCode), Channel};
+            Output = [{outgoing, ?DISCONNECT_PACKET(ReasonCode)},
+                      {close, ReasonName}],
+            {ok, Output, Channel};
         {?UINT_MAX, _Ver} ->
-            {close, ReasonName, Channel};
+            {ok, {close, ReasonName}, Channel};
         {Interval, ?MQTT_PROTO_V5} ->
             NChannel = ensure_timer(expire_timer, Interval, Channel),
-            {close, ReasonName, ?DISCONNECT_PACKET(ReasonCode), NChannel};
+            Output = [{outgoing, ?DISCONNECT_PACKET(ReasonCode)},
+                      {close, ReasonName}],
+            {ok, Output, NChannel};
         {Interval, _Ver} ->
-            {close, ReasonName, ensure_timer(expire_timer, Interval, Channel)}
+            NChannel = ensure_timer(expire_timer, Interval, Channel),
+            {ok, {close, ReasonName}, NChannel}
     end;
 
 handle_out({Type, Data}, Channel) ->
@@ -668,28 +681,33 @@ handle_out({Type, Data}, Channel) ->
 %% Handle call
 %%--------------------------------------------------------------------
 
+-spec(handle_call(Req :: term(), channel())
+      -> {reply, Reply :: term(), channel()}
+       | {stop, Reason :: term(), Reply :: term(), channel()}).
 handle_call(kick, Channel) ->
     {stop, {shutdown, kicked}, ok, Channel};
 
-handle_call(discard, Channel = #channel{state = #{state_name := connected}}) ->
+handle_call(discard, Channel = #channel{conn_state = connected}) ->
     Packet = ?DISCONNECT_PACKET(?RC_SESSION_TAKEN_OVER),
-    {stop, {shutdown, discarded}, Packet, ok, Channel};
-handle_call(discard, Channel = #channel{state = #{state_name := disconnected}}) ->
+    {stop, {shutdown, discarded}, ok, Packet, Channel};
+
+handle_call(discard, Channel = #channel{conn_state = disconnected}) ->
     {stop, {shutdown, discarded}, ok, Channel};
 
 %% Session Takeover
 handle_call({takeover, 'begin'}, Channel = #channel{session = Session}) ->
-    {ok, Session, Channel#channel{takeover = true}};
+    {reply, Session, Channel#channel{takeover = true}};
 
 handle_call({takeover, 'end'}, Channel = #channel{session  = Session,
                                                   pendings = Pendings}) ->
     ok = emqx_session:takeover(Session),
-    AllPendings = lists:append(emqx_misc:drain_deliver(), Pendings),
+    Delivers = emqx_misc:drain_deliver(),
+    AllPendings = lists:append(Delivers, Pendings),
     {stop, {shutdown, takeovered}, AllPendings, Channel};
 
 handle_call(Req, Channel) ->
     ?LOG(error, "Unexpected call: ~p", [Req]),
-    {ok, ignored, Channel}.
+    {reply, ignored, Channel}.
 
 %%--------------------------------------------------------------------
 %% Handle Info
@@ -716,26 +734,21 @@ handle_info({register, Attrs, Stats}, #channel{clientinfo = #{clientid := Client
     emqx_cm:set_chan_attrs(ClientId, Attrs),
     emqx_cm:set_chan_stats(ClientId, Stats);
 
-%%TODO: Fixme later
-%%handle_info(disconnected, Channel = #channel{connected = undefined}) ->
-%%    shutdown(closed, Channel);
-
-handle_info(disconnected, Channel = #channel{state = #{state_name := disconnected}}) ->
+handle_info({sock_closed, _Reason}, Channel = #channel{conn_state = disconnected}) ->
     {ok, Channel};
 
-handle_info(disconnected, Channel = #channel{conninfo = #{expiry_interval := ExpiryInterval},
-                                             clientinfo = ClientInfo = #{zone := Zone},
-                                             will_msg = WillMsg}) ->
+handle_info({sock_closed, _Reason}, Channel = #channel{conninfo = ConnInfo,
+                                                       clientinfo = ClientInfo = #{zone := Zone},
+                                                       will_msg = WillMsg}) ->
     emqx_zone:enable_flapping_detect(Zone) andalso emqx_flapping:detect(ClientInfo),
-    Channel1 = ensure_disconnected(Channel),
+    ConnInfo1 = ConnInfo#{disconnected_at => erlang:system_time(second)},
+    Channel1 = Channel#channel{conninfo = ConnInfo1, conn_state = disconnected},
     Channel2 = case timer:seconds(will_delay_interval(WillMsg)) of
-                   0 ->
-                       publish_will_msg(WillMsg),
-                       Channel1#channel{will_msg = undefined};
-                   _ ->
-                       ensure_timer(will_timer, Channel1)
+                   0 -> publish_will_msg(WillMsg),
+                        Channel1#channel{will_msg = undefined};
+                   _ -> ensure_timer(will_timer, Channel1)
                end,
-    case ExpiryInterval of
+    case maps:get(expiry_interval, ConnInfo) of
         ?UINT_MAX ->
             {ok, Channel2};
         Int when Int > 0 ->
@@ -746,6 +759,7 @@ handle_info(disconnected, Channel = #channel{conninfo = #{expiry_interval := Exp
 
 handle_info(Info, Channel) ->
     ?LOG(error, "Unexpected info: ~p~n", [Info]),
+    error(unexpected_info),
     {ok, Channel}.
 
 %%--------------------------------------------------------------------
@@ -859,14 +873,11 @@ will_delay_interval(undefined) -> 0;
 will_delay_interval(WillMsg) ->
     emqx_message:get_header('Will-Delay-Interval', WillMsg, 0).
 
-%% TODO: Implement later.
-disconnect(_Reason, Channel) -> {ok, Channel}.
-
 %%--------------------------------------------------------------------
 %% Terminate
 %%--------------------------------------------------------------------
 
-terminate(_, #channel{state = #{state_name := initialized}}) ->
+terminate(_, #channel{conn_state = idle}) ->
     ok;
 terminate(normal, #channel{conninfo = ConnInfo, clientinfo = ClientInfo}) ->
     ok = emqx_hooks:run('client.disconnected', [ClientInfo, normal, ConnInfo]);
@@ -877,14 +888,6 @@ terminate(Reason, #channel{conninfo = ConnInfo, clientinfo = ClientInfo, will_ms
     publish_will_msg(WillMsg),
     ok = emqx_hooks:run('client.disconnected', [ClientInfo, Reason, ConnInfo]).
 
--spec(received(pos_integer(), channel()) -> channel()).
-received(Oct, Channel) ->
-    ensure_timer(stats_timer, maybe_gc_and_check_oom(Oct, Channel)).
-
--spec(sent(pos_integer(), channel()) -> channel()).
-sent(Oct, Channel) ->
-    ensure_timer(stats_timer, maybe_gc_and_check_oom(Oct, Channel)).
-
 %% TODO: Improve will msg:)
 publish_will_msg(undefined) ->
     ok;
@@ -1142,11 +1145,6 @@ init_alias_maximum(#mqtt_packet_connect{proto_ver  = ?MQTT_PROTO_V5,
       inbound  => emqx_mqtt_caps:get_caps(Zone, max_topic_alias, 0)};
 init_alias_maximum(_ConnPkt, _ClientInfo) -> undefined.
 
-ensure_disconnected(Channel = #channel{state = FsmState}) ->
-    Channel#channel{state = FsmState#{state_name := disconnected,
-                                      disconnected_at => erlang:system_time(second)
-                                     }}.
-
 ensure_keepalive(#{'Server-Keep-Alive' := Interval}, Channel) ->
     ensure_keepalive_timer(Interval, Channel);
 ensure_keepalive(_AckProps, Channel = #channel{conninfo = ConnInfo}) ->

+ 364 - 357
src/emqx_connection.erl

@@ -14,41 +14,43 @@
 %% limitations under the License.
 %%--------------------------------------------------------------------
 
-%% MQTT TCP/SSL Connection
+%% MQTT/TCP Connection
 -module(emqx_connection).
 
--behaviour(gen_statem).
-
 -include("emqx.hrl").
 -include("emqx_mqtt.hrl").
 -include("logger.hrl").
 -include("types.hrl").
 
--logger_header("[Connection]").
+-logger_header("[MQTT]").
 
--export([start_link/3]).
+%% API
+-export([ start_link/3
+        , stop/1
+        ]).
 
-%% APIs
 -export([ info/1
         , stats/1
         ]).
 
 -export([call/2]).
 
-%% state callbacks
--export([ idle/3
-        , connected/3
-        , disconnected/3
-        ]).
+%% callback
+-export([init/4]).
 
-%% gen_statem callbacks
--export([ init/1
-        , callback_mode/0
-        , code_change/4
-        , terminate/3
+%% Sys callbacks
+-export([ system_continue/3
+        , system_terminate/4
+        , system_code_change/4
+        , system_get_state/1
         ]).
 
+%% Internal callbacks
+-export([wakeup_from_hib/2]).
+
 -record(state, {
+          %% Parent
+          parent :: pid(),
           %% TCP/TLS Transport
           transport :: esockd:transport(),
           %% TCP/TLS Socket
@@ -57,10 +59,10 @@
           peername :: emqx_types:peername(),
           %% Sockname of the connection
           sockname :: emqx_types:peername(),
+          %% Sock state
+          sockstate :: emqx_types:sockstate(),
           %% The {active, N} option
           active_n :: pos_integer(),
-          %% The active state
-          active_state :: running | blocked,
           %% Publish Limit
           pub_limit :: maybe(esockd_rate_limit:bucket()),
           %% Rate Limit
@@ -72,94 +74,107 @@
           %% Serialize function
           serialize :: emqx_frame:serialize_fun(),
           %% Channel State
-          chan_state :: emqx_channel:channel()
+          channel :: emqx_channel:channel(),
+          %% Idle timer
+          idle_timer :: reference()
         }).
 
 -type(state() :: #state{}).
 
 -define(ACTIVE_N, 100).
--define(HANDLE(T, C, D), handle((T), (C), (D))).
--define(INFO_KEYS, [socktype, peername, sockname, active_n, active_state,
+-define(INFO_KEYS, [socktype, peername, sockname, sockstate, active_n,
                     pub_limit, rate_limit]).
 -define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]).
 -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]).
 
-%% @doc Start the connection.
 -spec(start_link(esockd:transport(), esockd:socket(), proplists:proplist())
       -> {ok, pid()}).
 start_link(Transport, Socket, Options) ->
-    {ok, proc_lib:spawn_link(?MODULE, init, [{Transport, Socket, Options}])}.
+    CPid = proc_lib:spawn_link(?MODULE, init, [self(), Transport, Socket, Options]),
+    {ok, CPid}.
 
 %%--------------------------------------------------------------------
 %% API
 %%--------------------------------------------------------------------
 
-%% @doc Get infos of the connection.
+%% @doc Get infos of the connection/channel.
 -spec(info(pid()|state()) -> emqx_types:infos()).
 info(CPid) when is_pid(CPid) ->
     call(CPid, info);
-info(Conn = #state{chan_state = ChanState}) ->
-    ChanInfo = emqx_channel:info(ChanState),
-    SockInfo = maps:from_list(info(?INFO_KEYS, Conn)),
+info(State = #state{channel = Channel}) ->
+    ChanInfo = emqx_channel:info(Channel),
+    SockInfo = maps:from_list(info(?INFO_KEYS, State)),
     maps:merge(ChanInfo, #{sockinfo => SockInfo}).
 
-info(Keys, Conn) when is_list(Keys) ->
-    [{Key, info(Key, Conn)} || Key <- Keys];
+info(Keys, State) when is_list(Keys) ->
+    [{Key, info(Key, State)} || Key <- Keys];
 info(socktype, #state{transport = Transport, socket = Socket}) ->
     Transport:type(Socket);
 info(peername, #state{peername = Peername}) ->
     Peername;
 info(sockname, #state{sockname = Sockname}) ->
     Sockname;
+info(sockstate, #state{sockstate = SockSt}) ->
+    SockSt;
 info(active_n, #state{active_n = ActiveN}) ->
     ActiveN;
-info(active_state, #state{active_state = ActiveSt}) ->
-    ActiveSt;
 info(pub_limit, #state{pub_limit = PubLimit}) ->
     limit_info(PubLimit);
 info(rate_limit, #state{rate_limit = RateLimit}) ->
     limit_info(RateLimit);
-info(chan_state, #state{chan_state = ChanState}) ->
-    emqx_channel:info(ChanState).
+info(channel, #state{channel = Channel}) ->
+    emqx_channel:info(Channel).
 
 limit_info(Limit) ->
     emqx_misc:maybe_apply(fun esockd_rate_limit:info/1, Limit).
 
-%% @doc Get stats of the channel.
+%% @doc Get stats of the connection/channel.
 -spec(stats(pid()|state()) -> emqx_types:stats()).
 stats(CPid) when is_pid(CPid) ->
     call(CPid, stats);
-stats(#state{transport  = Transport,
-             socket     = Socket,
-             chan_state = ChanState}) ->
+stats(#state{transport = Transport,
+             socket    = Socket,
+             channel   = Channel}) ->
     SockStats = case Transport:getstat(Socket, ?SOCK_STATS) of
                     {ok, Ss}   -> Ss;
                     {error, _} -> []
                 end,
     ConnStats = emqx_pd:get_counters(?CONN_STATS),
-    ChanStats = emqx_channel:stats(ChanState),
+    ChanStats = emqx_channel:stats(Channel),
     ProcStats = emqx_misc:proc_stats(),
     lists:append([SockStats, ConnStats, ChanStats, ProcStats]).
 
-%% kick|discard|takeover
--spec(call(pid(), Req :: term()) -> Reply :: term()).
-call(CPid, Req) -> gen_statem:call(CPid, Req).
+call(Pid, Req) ->
+    gen_server:call(Pid, Req, infinity).
+
+stop(Pid) ->
+    gen_server:stop(Pid).
 
 %%--------------------------------------------------------------------
-%% gen_statem callbacks
+%% callbacks
 %%--------------------------------------------------------------------
 
-init({Transport, RawSocket, Options}) ->
+init(Parent, Transport, RawSocket, Options) ->
     case Transport:wait(RawSocket) of
-         {ok, Socket} ->
-             do_init(Transport, Socket, Options);
-         {error, Reason} ->
-             ?LOG(warning, "connection failed to establish: ~p", [Reason])
+        {ok, Socket} ->
+            do_init(Parent, Transport, Socket, Options);
+        {error, Reason} when Reason =:= enotconn;
+                             Reason =:= einval;
+                             Reason =:= closed ->
+            Transport:fast_close(RawSocket),
+            exit(normal);
+        {error, timeout} ->
+            Transport:fast_close(RawSocket),
+            exit({shutdown, ssl_upgrade_timeout});
+        {error, Reason} ->
+            Transport:fast_close(RawSocket),
+            exit(Reason)
     end.
 
-do_init(Transport, Socket, Options) ->
+do_init(Parent, Transport, Socket, Options) ->
     {ok, Peername} = Transport:ensure_ok_or_exit(peername, [Socket]),
     {ok, Sockname} = Transport:ensure_ok_or_exit(sockname, [Socket]),
+    emqx_logger:set_metadata_peername(esockd_net:format(Peername)),
     Peercert = Transport:ensure_ok_or_exit(peercert, [Socket]),
     ConnInfo = #{socktype => Transport:type(Socket),
                  peername => Peername,
@@ -167,7 +182,6 @@ do_init(Transport, Socket, Options) ->
                  peercert => Peercert,
                  conn_mod => ?MODULE
                 },
-    emqx_logger:set_metadata_peername(esockd_net:format(Peername)),
     Zone = proplists:get_value(zone, Options),
     ActiveN = proplists:get_value(active_n, Options, ?ACTIVE_N),
     PubLimit = init_limiter(emqx_zone:get_env(Zone, publish_limit)),
@@ -175,328 +189,315 @@ do_init(Transport, Socket, Options) ->
     FrameOpts = emqx_zone:frame_options(Zone),
     ParseState = emqx_frame:initial_parse_state(FrameOpts),
     Serialize = emqx_frame:serialize_fun(),
-    ChanState = emqx_channel:init(ConnInfo, Options),
-    State = #state{transport    = Transport,
-                   socket       = Socket,
-                   peername     = Peername,
-                   sockname     = Sockname,
-                   active_n     = ActiveN,
-                   active_state = running,
-                   pub_limit    = PubLimit,
-                   rate_limit   = RateLimit,
-                   parse_state  = ParseState,
-                   serialize    = Serialize,
-                   chan_state   = ChanState
-                  },
+    Channel = emqx_channel:init(ConnInfo, Options),
     IdleTimout = emqx_zone:get_env(Zone, idle_timeout, 30000),
-    gen_statem:enter_loop(?MODULE, [{hibernate_after, 2 * IdleTimout}],
-                          idle, State, self(), [IdleTimout]).
+    IdleTimer = emqx_misc:start_timer(IdleTimout, idle_timeout),
+    HibAfterTimeout = emqx_zone:get_env(Zone, hibernate_after, IdleTimout*2),
+    State = #state{parent      = Parent,
+                   transport   = Transport,
+                   socket      = Socket,
+                   peername    = Peername,
+                   sockname    = Sockname,
+                   sockstate   = idle,
+                   active_n    = ActiveN,
+                   pub_limit   = PubLimit,
+                   rate_limit  = RateLimit,
+                   parse_state = ParseState,
+                   serialize   = Serialize,
+                   channel     = Channel,
+                   idle_timer  = IdleTimer
+                  },
+    case activate_socket(State) of
+        {ok, NState} ->
+            recvloop(NState, #{hibernate_after => HibAfterTimeout});
+        {error, Reason} when Reason =:= einval;
+                             Reason =:= enotconn;
+                             Reason =:= closed ->
+            Transport:fast_close(Socket),
+            exit(normal);
+        {error, Reason} ->
+            Transport:fast_close(Socket),
+            erlang:exit({shutdown, Reason})
+    end.
 
 -compile({inline, [init_limiter/1]}).
 init_limiter(undefined) -> undefined;
 init_limiter({Rate, Burst}) ->
     esockd_rate_limit:new(Rate, Burst).
 
--compile({inline, [callback_mode/0]}).
-callback_mode() ->
-    [state_functions, state_enter].
-
 %%--------------------------------------------------------------------
-%% Idle State
-
-idle(enter, _, State) ->
-    case activate_socket(State) of
-        ok -> keep_state_and_data;
-        {error, Reason} ->
-            shutdown(Reason, State)
-    end;
-
-idle(timeout, _Timeout, State) ->
-    shutdown(idle_timeout, State);
-
-idle(cast, {incoming, Packet = ?CONNECT_PACKET(ConnPkt)}, State) ->
-    SuccFun = fun(NewSt) -> {next_state, connected, NewSt} end,
-    Serialize = emqx_frame:serialize_fun(ConnPkt),
-    NState = State#state{serialize = Serialize},
-    handle_incoming(Packet, SuccFun, NState);
-
-idle(cast, {incoming, Packet}, State) when is_record(Packet, mqtt_packet) ->
-    SuccFun = fun(NewSt) -> {next_state, connected, NewSt} end,
-    handle_incoming(Packet, SuccFun, State);
-
-idle(cast, {incoming, FrameError = {frame_error, _Reason}}, State) ->
-    handle_incoming(FrameError, State);
-
-idle(EventType, Content, State) ->
-    ?HANDLE(EventType, Content, State).
-
-%%--------------------------------------------------------------------
-%% Connected State
-
-connected(enter, _PrevSt, State) ->
-    ok = register_self(State),
-    keep_state_and_data;
-
-connected(cast, {incoming, Packet}, State) when is_record(Packet, mqtt_packet) ->
-    handle_incoming(Packet, fun keep_state/1, State);
-
-connected(cast, {incoming, FrameError = {frame_error, _Reason}}, State) ->
-    handle_incoming(FrameError, State);
-
-connected(info, Deliver = {deliver, _Topic, _Msg}, State) ->
-    handle_deliver(emqx_misc:drain_deliver([Deliver]), State);
-
-connected(EventType, Content, State) ->
-    ?HANDLE(EventType, Content, State).
-
-%%--------------------------------------------------------------------
-%% Disconnected State
-
-disconnected(enter, _, State = #state{chan_state = ChanState}) ->
-    case emqx_channel:handle_info(disconnected, ChanState) of
-        {ok, NChanState} ->
-            ok = register_self(State#state{chan_state = NChanState}),
-            keep_state(State#state{chan_state = NChanState});
-        {stop, Reason, NChanState} ->
-            stop(Reason, State#state{chan_state = NChanState})
-    end;
+%% Recv Loop
+
+recvloop(State = #state{parent = Parent},
+         Options = #{hibernate_after := HibAfterTimeout}) ->
+    receive
+        {system, From, Request} ->
+            sys:handle_system_msg(Request, From, Parent,
+                                  ?MODULE, [], {State, Options});
+        {'EXIT', Parent, Reason} ->
+            terminate(Reason, State);
+        Msg ->
+            process_msg([Msg], State, Options)
+    after
+        HibAfterTimeout ->
+            hibernate(State, Options)
+    end.
 
-disconnected(info, Deliver = {deliver, _Topic, _Msg}, State) ->
-    handle_deliver([Deliver], State);
+hibernate(State, Options) ->
+    proc_lib:hibernate(?MODULE, wakeup_from_hib, [State, Options]).
 
-disconnected(EventType, Content, State) ->
-    ?HANDLE(EventType, Content, State).
+wakeup_from_hib(State, Options) ->
+    %% Maybe do something later here.
+    recvloop(State, Options).
 
 %%--------------------------------------------------------------------
-%% Handle call
-
-handle({call, From}, info, State) ->
-    reply(From, info(State), State);
-
-handle({call, From}, stats, State) ->
-    reply(From, stats(State), State);
-
-handle({call, From}, state, State) ->
-    reply(From, State, State);
-
-handle({call, From}, Req, State = #state{chan_state = ChanState}) ->
-    case emqx_channel:handle_call(Req, ChanState) of
-        {ok, Reply, NChanState} ->
-            reply(From, Reply, State#state{chan_state = NChanState});
-        {stop, Reason, Reply, NChanState} ->
-            ok = gen_statem:reply(From, Reply),
-            stop(Reason, State#state{chan_state = NChanState});
-        {stop, Reason, Packet, Reply, NChanState} ->
-            handle_outgoing(Packet, State#state{chan_state = NChanState}),
-            ok = gen_statem:reply(From, Reply),
-            stop(Reason, State#state{chan_state = NChanState})
-    end;
+%% Process next Msg
+
+process_msg([], State, Options) ->
+    recvloop(State, Options);
+
+process_msg([Msg|More], State, Options) ->
+    case catch handle_msg(Msg, State) of
+        ok ->
+            process_msg(More, State, Options);
+        {ok, NState} ->
+            process_msg(More, NState, Options);
+        {ok, NextMsgs, NState} ->
+            process_msg(append_msg(NextMsgs, More), NState, Options);
+        {stop, Reason} ->
+            terminate(Reason, State);
+        {stop, Reason, NState} ->
+            terminate(Reason, NState);
+        {'EXIT', Reason} ->
+            terminate(Reason, State)
+    end.
 
 %%--------------------------------------------------------------------
-%% Handle cast
-
-handle(cast, Msg, State = #state{chan_state = ChanState}) ->
-    case emqx_channel:handle_info(Msg, ChanState) of
-        ok -> {ok, State};
-        {ok, NChanState} ->
-            keep_state(State#state{chan_state = NChanState});
-        {stop, Reason, NChanState} ->
-            stop(Reason, State#state{chan_state = NChanState})
+%% Handle a Msg
+
+handle_msg({'$gen_call', From, Req}, State) ->
+    case handle_call(From, Req, State) of
+        {reply, Reply, NState} ->
+            gen_server:reply(From, Reply),
+            {ok, NState};
+        {stop, Reason, Reply, NState} ->
+            gen_server:reply(From, Reply),
+            stop(Reason, NState)
     end;
 
-%%--------------------------------------------------------------------
-%% Handle info
-
-%% Handle incoming data
-handle(info, {Inet, _Sock, Data}, State = #state{chan_state = ChanState})
+handle_msg({Inet, _Sock, Data}, State = #state{channel = Channel})
   when Inet == tcp; Inet == ssl ->
     ?LOG(debug, "RECV ~p", [Data]),
     Oct = iolist_size(Data),
     emqx_pd:update_counter(incoming_bytes, Oct),
     ok = emqx_metrics:inc('bytes.received', Oct),
-    NChanState = emqx_channel:received(Oct, ChanState),
-    NState = State#state{chan_state = NChanState},
-    process_incoming(Data, NState);
+    {ok, NChannel} = emqx_channel:handle_in(Oct, Channel),
+    process_incoming(Data, State#state{channel = NChannel});
+
+handle_msg({incoming, Packet = ?CONNECT_PACKET(ConnPkt)},
+           State = #state{idle_timer = IdleTimer}) ->
+    ok = emqx_misc:cancel_timer(IdleTimer),
+    Serialize = emqx_frame:serialize_fun(ConnPkt),
+    NState = State#state{serialize  = Serialize,
+                         idle_timer = undefined
+                        },
+    handle_incoming(Packet, NState);
 
-handle(info, {Error, _Sock, Reason}, State)
+handle_msg({incoming, Packet}, State) ->
+    handle_incoming(Packet, State);
+
+handle_msg({Error, _Sock, Reason}, State)
   when Error == tcp_error; Error == ssl_error ->
-    shutdown(Reason, State);
+    handle_info({sock_error, Reason}, State);
 
-handle(info, {Closed, _Sock}, State)
+handle_msg({Closed, _Sock}, State)
   when Closed == tcp_closed; Closed == ssl_closed ->
-    {next_state, disconnected, State};
+    handle_info(sock_closed, State);
 
-handle(info, {Passive, _Sock}, State)
+handle_msg({Passive, _Sock}, State)
   when Passive == tcp_passive; Passive == ssl_passive ->
-    %% Rate limit here:)
+    %% Rate limit and activate socket here.
     NState = ensure_rate_limit(State),
     case activate_socket(NState) of
-        ok -> keep_state(NState);
+        {ok, NState} -> {ok, NState};
         {error, Reason} ->
-            shutdown(Reason, NState)
+            {ok, {sock_error, Reason}, NState}
     end;
 
-handle(info, activate_socket, State) ->
-    %% Rate limit timer expired.
-    NState = State#state{active_state = running,
-                         limit_timer  = undefined
+%% Rate limit timer expired.
+handle_msg(activate_socket, State) ->
+    NState = State#state{sockstate   = idle,
+                         limit_timer = undefined
                         },
     case activate_socket(NState) of
-        ok -> keep_state(NState);
+        {ok, NState} -> {ok, NState};
         {error, Reason} ->
-            shutdown(Reason, NState)
+            {ok, {sock_error, Reason}, State}
     end;
 
-handle(info, {inet_reply, _Sock, ok}, _State) ->
-    %% something sent
-    keep_state_and_data;
+handle_msg(Deliver = {deliver, _Topic, _Msg},
+           State = #state{channel = Channel}) ->
+    Delivers = emqx_misc:drain_deliver([Deliver]),
+    Result = emqx_channel:handle_out(Delivers, Channel),
+    handle_return(Result, State);
 
-handle(info, {inet_reply, _Sock, {error, Reason}}, State) ->
-    shutdown(Reason, State);
+handle_msg({outgoing, Packets}, State) ->
+    {ok, handle_outgoing(Packets, State)};
 
-handle(info, {timeout, TRef, keepalive},
-       State = #state{transport = Transport, socket = Socket}) ->
-    case Transport:getstat(Socket, [recv_oct]) of
-        {ok, [{recv_oct, RecvOct}]} ->
-            handle_timeout(TRef, {keepalive, RecvOct}, State);
-        {error, Reason} ->
-            shutdown(Reason, State)
-    end;
+%% something sent
+handle_msg({inet_reply, _Sock, ok}, _State) ->
+    ok;
 
-handle(info, {timeout, TRef, emit_stats}, State) ->
-    handle_timeout(TRef, {emit_stats, stats(State)}, State);
+handle_msg({inet_reply, _Sock, {error, Reason}}, State) ->
+    handle_info({sock_error, Reason}, State);
 
-handle(info, {timeout, TRef, Msg}, State) ->
-    handle_timeout(TRef, Msg, State);
+handle_msg({timeout, TRef, TMsg}, State) ->
+    handle_timeout(TRef, TMsg, State);
 
-handle(info, {shutdown, Reason}, State) ->
-    shutdown(Reason, State);
+handle_msg(Shutdown = {shutdown, _Reason}, State) ->
+    stop(Shutdown, State);
 
-handle(info, Info, State = #state{chan_state = ChanState}) ->
-    case emqx_channel:handle_info(Info, ChanState) of
-        {ok, NChanState} ->
-            keep_state(State#state{chan_state = NChanState});
-        {stop, Reason, NChanState} ->
-            stop(Reason, State#state{chan_state = NChanState})
-    end.
+handle_msg(Msg, State) -> handle_info(Msg, State).
 
-code_change(_Vsn, State, Data, _Extra) ->
-    {ok, State, Data}.
+%%--------------------------------------------------------------------
+%% Terminate
 
-terminate(Reason, _StateName, #state{transport  = Transport,
-                                     socket     = Socket,
-                                     chan_state = ChanState
-                                    }) ->
+terminate(Reason, #state{transport = Transport,
+                         socket    = Socket,
+                         sockstate = SockSt,
+                         channel   = Channel}) ->
     ?LOG(debug, "Terminated for ~p", [Reason]),
-    ok = Transport:fast_close(Socket),
-    emqx_channel:terminate(Reason, ChanState).
+    SockSt =:= closed orelse Transport:fast_close(Socket),
+    emqx_channel:terminate(Reason, Channel),
+    exit(Reason).
+
+%%--------------------------------------------------------------------
+%% Sys callbacks
+
+system_continue(_Parent, _Deb, {State, Options}) ->
+    recvloop(State, Options).
+
+system_terminate(Reason, _Parent, _Deb, {State, _}) ->
+    terminate(Reason, State).
+
+system_code_change(Misc, _, _, _) ->
+    {ok, Misc}.
+
+system_get_state({State, _Options}) ->
+    {ok, State}.
 
 %%--------------------------------------------------------------------
-%% Internal functions
+%% Handle call
+
+handle_call(_From, info, State) ->
+    {reply, info(State), State};
+
+handle_call(_From, stats, State) ->
+    {reply, stats(State), State};
+
+handle_call(_From, Req, State = #state{channel = Channel}) ->
+    case emqx_channel:handle_call(Req, Channel) of
+        {reply, Reply, NChannel} ->
+            {reply, Reply, State#state{channel = NChannel}};
+        {stop, Reason, Reply, NChannel} ->
+            {stop, Reason, Reply, State#state{channel = NChannel}};
+        {stop, Reason, Reply, OutPacket, NChannel} ->
+            NState = State#state{channel = NChannel},
+            NState1 = handle_outgoing(OutPacket, NState),
+            {stop, Reason, Reply, NState1}
+    end.
+
 %%--------------------------------------------------------------------
+%% Handle timeout
 
-register_self(State = #state{active_n = ActiveN,
-                             active_state = ActiveSt,
-                             chan_state = ChanState
-                            }) ->
-    ChanAttrs = emqx_channel:attrs(ChanState),
-    SockAttrs = #{active_n => ActiveN,
-                  active_state => ActiveSt
-                 },
-    Attrs = maps:merge(ChanAttrs, #{sockinfo => SockAttrs}),
-    emqx_channel:handle_info({register, Attrs, stats(State)}, ChanState).
+handle_timeout(TRef, idle_timeout, State = #state{idle_timer = TRef}) ->
+    stop(idle_timeout, State);
+
+handle_timeout(TRef, emit_stats, State) ->
+    handle_timeout(TRef, {emit_stats, stats(State)}, State);
+
+handle_timeout(TRef, keepalive, State = #state{transport = Transport,
+                                               socket    = Socket}) ->
+    case Transport:getstat(Socket, [recv_oct]) of
+        {ok, [{recv_oct, RecvOct}]} ->
+            handle_timeout(TRef, {keepalive, RecvOct}, State);
+        {error, Reason} ->
+            handle_info({sockerr, Reason}, State)
+    end;
+
+handle_timeout(TRef, Msg, State = #state{channel = Channel}) ->
+    handle_return(emqx_channel:handle_timeout(TRef, Msg, Channel), State).
 
 %%--------------------------------------------------------------------
-%% Process incoming data
+%% Process/Parse incoming data.
 
 -compile({inline, [process_incoming/2]}).
 process_incoming(Data, State) ->
-    process_incoming(Data, [], State).
+    {Packets, NState} = parse_incoming(Data, State),
+    {ok, next_incoming_msgs(Packets), NState}.
+
+-compile({inline, [parse_incoming/2]}).
+parse_incoming(Data, State) ->
+    parse_incoming(Data, [], State).
 
-process_incoming(<<>>, Packets, State) ->
-    keep_state(State, next_incoming_events(Packets));
+parse_incoming(<<>>, Packets, State) ->
+    {Packets, State};
 
-process_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
+parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
     try emqx_frame:parse(Data, ParseState) of
         {more, NParseState} ->
-            NState = State#state{parse_state = NParseState},
-            keep_state(NState, next_incoming_events(Packets));
+            {Packets, State#state{parse_state = NParseState}};
         {ok, Packet, Rest, NParseState} ->
             NState = State#state{parse_state = NParseState},
-            process_incoming(Rest, [Packet|Packets], NState)
+            parse_incoming(Rest, [Packet|Packets], NState)
     catch
         error:Reason:Stk ->
             ?LOG(error, "~nParse failed for ~p~nStacktrace: ~p~nFrame data:~p",
                  [Reason, Stk, Data]),
-            keep_state(State, next_incoming_events(Packets++[{frame_error, Reason}]))
+            {[{frame_error, Reason}|Packets], State}
     end.
 
--compile({inline, [next_incoming_events/1]}).
-next_incoming_events([]) -> [];
-next_incoming_events(Packets) ->
-    [next_event(cast, {incoming, Packet}) || Packet <- Packets].
+next_incoming_msgs([Packet]) ->
+    {incoming, Packet};
+next_incoming_msgs(Packets) ->
+    [{incoming, Packet} || Packet <- lists:reverse(Packets)].
 
 %%--------------------------------------------------------------------
 %% Handle incoming packet
 
-handle_incoming(Packet = ?PACKET(Type), SuccFun, State = #state{chan_state = ChanState}) ->
+handle_incoming(Packet = ?PACKET(Type), State = #state{channel = Channel}) ->
     _ = inc_incoming_stats(Type),
-    _ = emqx_metrics:inc_recv(Packet),
+    ok = emqx_metrics:inc_recv(Packet),
     ?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]),
-    case emqx_channel:handle_in(Packet, ChanState) of
-        {ok, NChanState} ->
-            SuccFun(State#state{chan_state= NChanState});
-        {ok, OutPackets, NChanState} ->
-            NState = State#state{chan_state = NChanState},
-            handle_outgoing(OutPackets, SuccFun, NState);
-        {close, Reason, NChanState} ->
-            close(Reason, State#state{chan_state = NChanState});
-        {close, Reason, OutPackets, NChanState} ->
-            NState = State#state{chan_state= NChanState},
-            close(Reason, handle_outgoing(OutPackets, fun(NewSt) -> NewSt end, NState));
-        {stop, Reason, NChanState} ->
-            stop(Reason, State#state{chan_state = NChanState});
-        {stop, Reason, OutPackets, NChanState} ->
-            NState = State#state{chan_state= NChanState},
-            stop(Reason, handle_outgoing(OutPackets, fun(NewSt) -> NewSt end, NState))
-    end.
-
-handle_incoming(FrameError = {frame_error, _Reason}, State = #state{chan_state = ChanState}) ->
-    case emqx_channel:handle_in(FrameError, ChanState) of
-        {close, Reason, NChanState} ->
-            close(Reason, State#state{chan_state = NChanState});
-        {close, Reason, OutPackets, NChanState} ->
-            NState = State#state{chan_state= NChanState},
-            close(Reason, handle_outgoing(OutPackets, fun(NewSt) -> NewSt end, NState));
-        {stop, Reason, NChanState} ->
-            stop(Reason, State#state{chan_state = NChanState});
-        {stop, Reason, OutPackets, NChanState} ->
-            NState = State#state{chan_state= NChanState},
-            stop(Reason, handle_outgoing(OutPackets, fun(NewSt) -> NewSt end, NState))
-    end.
+    handle_return(emqx_channel:handle_in(Packet, Channel), State);
 
-%%-------------------------------------------------------------------
-%% Handle deliver
+handle_incoming(FrameError, State = #state{channel = Channel}) ->
+    handle_return(emqx_channel:handle_in(FrameError, Channel), State).
 
-handle_deliver(Delivers, State = #state{chan_state = ChanState}) ->
-    case emqx_channel:handle_out({deliver, Delivers}, ChanState) of
-        {ok, NChanState} ->
-            keep_state(State#state{chan_state = NChanState});
-        {ok, Packets, NChanState} ->
-            handle_outgoing(Packets, fun keep_state/1, State#state{chan_state = NChanState})
-    end.
+%%--------------------------------------------------------------------
+%% Handle channel return
+
+handle_return(ok, State) ->
+    {ok, State};
+handle_return({ok, NChannel}, State) ->
+    {ok, State#state{channel = NChannel}};
+handle_return({ok, Replies, NChannel}, State) ->
+    {ok, next_msgs(Replies), State#state{channel = NChannel}};
+handle_return({stop, Reason, NChannel}, State) ->
+    stop(Reason, State#state{channel = NChannel});
+handle_return({stop, Reason, OutPacket, NChannel}, State) ->
+    NState = State#state{channel = NChannel},
+    NState1 = handle_outgoing(OutPacket, NState),
+    stop(Reason, NState1).
 
 %%--------------------------------------------------------------------
 %% Handle outgoing packets
 
-handle_outgoing(Packet, State) ->
-    handle_outgoing(Packet, fun (_) -> ok end, State).
-
-handle_outgoing(Packets, SuccFun, State) when is_list(Packets) ->
-    send(lists:map(serialize_and_inc_stats_fun(State), Packets), SuccFun, State);
+handle_outgoing(Packets, State) when is_list(Packets) ->
+    send(lists:map(serialize_and_inc_stats_fun(State), Packets), State);
 
-handle_outgoing(Packet, SuccFun, State) ->
-    send((serialize_and_inc_stats_fun(State))(Packet), SuccFun, State).
+handle_outgoing(Packet, State) ->
+    send((serialize_and_inc_stats_fun(State))(Packet), State).
 
 serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
     fun(Packet = ?PACKET(Type)) ->
@@ -514,39 +515,73 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
 %%--------------------------------------------------------------------
 %% Send data
 
-send(IoData, SuccFun, State = #state{transport  = Transport,
-                                     socket     = Socket,
-                                     chan_state = ChanState}) ->
+send(IoData, State = #state{transport = Transport,
+                            socket    = Socket,
+                            channel   = Channel}) ->
     Oct = iolist_size(IoData),
     ok = emqx_metrics:inc('bytes.sent', Oct),
     case Transport:async_send(Socket, IoData) of
-        ok -> NChanState = emqx_channel:sent(Oct, ChanState),
-              SuccFun(State#state{chan_state = NChanState});
-        {error, Reason} ->
-            shutdown(Reason, State)
+        ok ->
+            {ok, NChannel} = emqx_channel:handle_out(Oct, Channel),
+            State#state{channel = NChannel};
+        Error = {error, _Reason} ->
+            %% Simulate an inet_reply to postpone handling the error
+            self() ! {inet_reply, Socket, Error}, State
     end.
 
 %%--------------------------------------------------------------------
-%% Handle timeout
+%% Handle Info
+
+handle_info({enter, _}, State = #state{active_n  = ActiveN,
+                                       sockstate = SockSt,
+                                       channel   = Channel}) ->
+    ChanAttrs = emqx_channel:attrs(Channel),
+    SockAttrs = #{active_n  => ActiveN,
+                  sockstate => SockSt
+                 },
+    Attrs = maps:merge(ChanAttrs, #{sockinfo => SockAttrs}),
+    handle_info({register, Attrs, stats(State)}, State);
+
+handle_info({sockerr, _Reason}, #state{sockstate = closed}) -> ok;
+handle_info({sockerr, Reason}, State) ->
+    ?LOG(debug, "Socket error: ~p", [Reason]),
+    handle_info({sock_closed, Reason}, close_socket(State));
+
+handle_info(sock_closed, #state{sockstate = closed}) -> ok;
+handle_info(sock_closed, State) ->
+    ?LOG(debug, "Socket closed"),
+    handle_info({sock_closed, closed}, close_socket(State));
 
-handle_timeout(TRef, Msg, State = #state{chan_state = ChanState}) ->
-    case emqx_channel:handle_timeout(TRef, Msg, ChanState) of
-        {ok, NChanState} ->
-            keep_state(State#state{chan_state = NChanState});
-        {ok, Packets, NChanState} ->
-            handle_outgoing(Packets, fun keep_state/1, State#state{chan_state = NChanState});
-        {close, Reason, NChanState} ->
-            close(Reason, State#state{chan_state = NChanState});
-        {close, Reason, OutPackets, NChanState} ->
-            NState = State#state{chan_state= NChanState},
-            close(Reason, handle_outgoing(OutPackets, fun(NewSt) -> NewSt end, NState));
-        {stop, Reason, NChanState} ->
-            stop(Reason, State#state{chan_state = NChanState});
-        {stop, Reason, OutPackets, NChanState} ->
-            NState = State#state{chan_state= NChanState},
-            stop(Reason, handle_outgoing(OutPackets, fun(NewSt) -> NewSt end, NState))
+handle_info({close, Reason}, State) ->
+    ?LOG(debug, "Force close due to : ~p", [Reason]),
+    {ok, close_socket(State)};
+
+handle_info(Info, State = #state{channel = Channel}) ->
+    handle_return(emqx_channel:handle_info(Info, Channel), State).
+
+%%--------------------------------------------------------------------
+%% Activate Socket
+
+-compile({inline, [activate_socket/1]}).
+activate_socket(State = #state{sockstate = closed}) ->
+    {ok, State};
+activate_socket(State = #state{sockstate = blocked}) ->
+    {ok, State};
+activate_socket(State = #state{transport = Transport,
+                               socket    = Socket,
+                               active_n  = N}) ->
+    case Transport:setopts(Socket, [{active, N}]) of
+        ok -> {ok, State#state{sockstate = running}};
+        Error -> Error
     end.
 
+%%--------------------------------------------------------------------
+%% Close Socket
+
+close_socket(State = #state{transport = Transport, socket = Socket}) ->
+    ok = Transport:fast_close(Socket),
+    State#state{sockstate = closed}.
+
 %%--------------------------------------------------------------------
 %% Ensure rate limit
 
@@ -568,22 +603,10 @@ ensure_rate_limit([{Rl, Pos, Cnt}|Limiters], State) ->
         {Pause, Rl1} ->
             ?LOG(debug, "Pause ~pms due to rate limit", [Pause]),
             TRef = erlang:send_after(Pause, self(), activate_socket),
-            NState = State#state{active_state = blocked,
-                                 limit_timer  = TRef
-                                },
+            NState = State#state{sockstate = blocked, limit_timer = TRef},
             setelement(Pos, NState, Rl1)
     end.
 
-%%--------------------------------------------------------------------
-%% Activate Socket
-
--compile({inline, [activate_socket/1]}).
-activate_socket(#state{active_state = blocked}) -> ok;
-activate_socket(#state{transport = Transport,
-                       socket    = Socket,
-                       active_n  = N}) ->
-    Transport:setopts(Socket, [{active, N}]).
-
 %%--------------------------------------------------------------------
 %% Inc incoming/outgoing stats
 
@@ -600,41 +623,25 @@ inc_incoming_stats(Type) when is_integer(Type) ->
 -compile({inline, [inc_outgoing_stats/1]}).
 inc_outgoing_stats(Type) ->
     emqx_pd:update_counter(send_pkt, 1),
-    (Type == ?PUBLISH)
-        andalso emqx_pd:update_counter(send_msg, 1).
+    (Type == ?PUBLISH) andalso emqx_pd:update_counter(send_msg, 1).
 
 %%--------------------------------------------------------------------
 %% Helper functions
 
--compile({inline,
-          [ reply/3
-          , keep_state/1
-          , keep_state/2
-          , next_event/2
-          , shutdown/2
-          , stop/2
-          ]}).
-
-reply(From, Reply, State) ->
-    {keep_state, State, [{reply, From, Reply}]}.
-
-keep_state(State) ->
-    {keep_state, State}.
-
-keep_state(State, Events) ->
-    {keep_state, State, Events}.
-
-next_event(Type, Content) ->
-    {next_event, Type, Content}.
-
-close(Reason, State = #state{transport = Transport, socket = Socket}) ->
-    ?LOG(warning, "Closed for ~p", [Reason]),
-    ok = Transport:fast_close(Socket),
-    {next_state, disconnected, State}.
+-compile({inline, [append_msg/2]}).
+append_msg(Msgs, Q) when is_list(Msgs) ->
+    lists:append(Msgs, Q);
+append_msg(Msg, Q) -> [Msg|Q].
 
-shutdown(Reason, State) ->
-    stop({shutdown, Reason}, State).
+-compile({inline, [next_msgs/1]}).
+next_msgs(Packet) when is_record(Packet, mqtt_packet) ->
+    {outgoing, Packet};
+next_msgs(Action) when is_tuple(Action) ->
+    Action;
+next_msgs(Actions) when is_list(Actions) ->
+    Actions.
 
+-compile({inline, [stop/2]}).
 stop(Reason, State) ->
     {stop, Reason, State}.
 

+ 4 - 1
src/emqx_types.erl

@@ -31,7 +31,9 @@
              , subid/0
              ]).
 
--export_type([ conninfo/0
+-export_type([ socktype/0
+             , sockstate/0
+             , conninfo/0
              , clientinfo/0
              , clientid/0
              , username/0
@@ -97,6 +99,7 @@
 -type(subid() :: binary() | atom()).
 
 -type(socktype() :: tcp | udp | ssl | proxy | atom()).
+-type(sockstate() :: idle | running | blocked | closed).
 -type(conninfo() :: #{socktype := socktype(),
                       sockname := peername(),
                       peername := peername(),

+ 129 - 138
src/emqx_ws_connection.erl

@@ -14,7 +14,7 @@
 %% limitations under the License.
 %%--------------------------------------------------------------------
 
-%% MQTT WebSocket Connection
+%% MQTT/WS Connection
 -module(emqx_ws_connection).
 
 -include("emqx.hrl").
@@ -22,8 +22,9 @@
 -include("logger.hrl").
 -include("types.hrl").
 
--logger_header("[WsConnection]").
+-logger_header("[MQTT/WS]").
 
+%% API
 -export([ info/1
         , stats/1
         ]).
@@ -35,6 +36,7 @@
         , websocket_init/1
         , websocket_handle/2
         , websocket_info/2
+        , websocket_close/2
         , terminate/3
         ]).
 
@@ -43,14 +45,14 @@
           peername :: emqx_types:peername(),
           %% Sockname of the ws connection
           sockname :: emqx_types:peername(),
-          %% Conn state
-          conn_state :: idle | connected | disconnected,
+          %% Sock state
+          sockstate :: emqx_types:sockstate(),
           %% Parser State
           parse_state :: emqx_frame:parse_state(),
           %% Serialize function
           serialize :: emqx_frame:serialize_fun(),
-          %% Channel State
-          chan_state :: emqx_channel:channel(),
+          %% Channel
+          channel :: emqx_channel:channel(),
           %% Out Pending Packets
           pendings :: list(emqx_types:packet()),
           %% The stop reason
@@ -59,7 +61,7 @@
 
 -type(state() :: #state{}).
 
--define(INFO_KEYS, [socktype, peername, sockname, active_state]).
+-define(INFO_KEYS, [socktype, peername, sockname, sockstate]).
 -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]).
 -define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]).
 
@@ -70,8 +72,8 @@
 -spec(info(pid()|state()) -> emqx_types:infos()).
 info(WsPid) when is_pid(WsPid) ->
     call(WsPid, info);
-info(WsConn = #state{chan_state = ChanState}) ->
-    ChanInfo = emqx_channel:info(ChanState),
+info(WsConn = #state{channel = Channel}) ->
+    ChanInfo = emqx_channel:info(Channel),
     SockInfo = maps:from_list(info(?INFO_KEYS, WsConn)),
     maps:merge(ChanInfo, #{sockinfo => SockInfo}).
 
@@ -83,18 +85,18 @@ info(peername, #state{peername = Peername}) ->
     Peername;
 info(sockname, #state{sockname = Sockname}) ->
     Sockname;
-info(active_state, _State) ->
-    running;
-info(chan_state, #state{chan_state = ChanState}) ->
-    emqx_channel:info(ChanState).
+info(sockstate, #state{sockstate = SockSt}) ->
+    SockSt;
+info(channel, #state{channel = Channel}) ->
+    emqx_channel:info(Channel).
 
 -spec(stats(pid()|state()) -> emqx_types:stats()).
 stats(WsPid) when is_pid(WsPid) ->
     call(WsPid, stats);
-stats(#state{chan_state = ChanState}) ->
+stats(#state{channel = Channel}) ->
     SockStats = emqx_pd:get_counters(?SOCK_STATS),
     ConnStats = emqx_pd:get_counters(?CONN_STATS),
-    ChanStats = emqx_channel:stats(ChanState),
+    ChanStats = emqx_channel:stats(Channel),
     ProcStats = emqx_misc:proc_stats(),
     lists:append([SockStats, ConnStats, ChanStats, ProcStats]).
 
@@ -168,27 +170,26 @@ websocket_init([Req, Opts]) ->
     FrameOpts = emqx_zone:frame_options(Zone),
     ParseState = emqx_frame:initial_parse_state(FrameOpts),
     Serialize = emqx_frame:serialize_fun(),
-    ChanState = emqx_channel:init(ConnInfo, Opts),
+    Channel = emqx_channel:init(ConnInfo, Opts),
     emqx_logger:set_metadata_peername(esockd_net:format(Peername)),
     {ok, #state{peername    = Peername,
                 sockname    = Sockname,
-                conn_state  = idle,
+                sockstate   = idle,
                 parse_state = ParseState,
                 serialize   = Serialize,
-                chan_state  = ChanState,
+                channel     = Channel,
                 pendings    = []
                }}.
 
 websocket_handle({binary, Data}, State) when is_list(Data) ->
     websocket_handle({binary, iolist_to_binary(Data)}, State);
 
-websocket_handle({binary, Data}, State = #state{chan_state = ChanState}) ->
+websocket_handle({binary, Data}, State = #state{channel = Channel}) ->
     ?LOG(debug, "RECV ~p", [Data]),
     Oct = iolist_size(Data),
     ok = inc_recv_stats(1, Oct),
-    NChanState = emqx_channel:received(Oct, ChanState),
-    NState = State#state{chan_state = NChanState},
-    process_incoming(Data, NState);
+    {ok, NChannel} = emqx_channel:handle_in(Oct, Channel),
+    process_incoming(Data, State#state{channel = NChannel});
 
 %% Pings should be replied with pongs, cowboy does it automatically
 %% Pongs can be safely ignored. Clause here simply prevents crash.
@@ -203,56 +204,27 @@ websocket_handle({FrameType, _}, State) ->
     ?LOG(error, "Unexpected frame - ~p", [FrameType]),
     stop({shutdown, unexpected_ws_frame}, State).
 
-websocket_info({call, From, info}, State) ->
-    gen_server:reply(From, info(State)),
-    {ok, State};
+websocket_info({call, From, Req}, State) ->
+    handle_call(From, Req, State);
 
-websocket_info({call, From, stats}, State) ->
-    gen_server:reply(From, stats(State)),
-    {ok, State};
-
-websocket_info({call, From, state}, State) ->
-    gen_server:reply(From, State),
-    {ok, State};
-
-websocket_info({call, From, Req}, State = #state{chan_state = ChanState}) ->
-    case emqx_channel:handle_call(Req, ChanState) of
-        {ok, Reply, NChanState} ->
-            _ = gen_server:reply(From, Reply),
-            {ok, State#state{chan_state = NChanState}};
-        {stop, Reason, Reply, NChanState} ->
-            _ = gen_server:reply(From, Reply),
-            stop(Reason, State#state{chan_state = NChanState})
-    end;
-
-websocket_info({cast, Msg}, State = #state{chan_state = ChanState}) ->
-    case emqx_channel:handle_info(Msg, ChanState) of
-        ok -> {ok, State};
-        {ok, NChanState} ->
-            {ok, State#state{chan_state = NChanState}};
-        {stop, Reason, NChanState} ->
-            stop(Reason, State#state{chan_state = NChanState})
-    end;
+websocket_info({cast, Msg}, State = #state{channel = Channel}) ->
+    handle_return(emqx_channel:handle_info(Msg, Channel), State);
 
 websocket_info({incoming, Packet = ?CONNECT_PACKET(ConnPkt)}, State) ->
-    NState = State#state{serialize = emqx_frame:serialize_fun(ConnPkt)},
-    handle_incoming(Packet, fun connected/1, NState);
-
-websocket_info({incoming, Packet}, State) when is_record(Packet, mqtt_packet) ->
-    handle_incoming(Packet, fun reply/1, State);
+    Serialize = emqx_frame:serialize_fun(ConnPkt),
+    NState = State#state{sockstate = running,
+                         serialize = Serialize
+                        },
+    handle_incoming(Packet, NState);
 
-websocket_info({incoming, FrameError = {frame_error, _Reason}}, State) ->
-    handle_incoming(FrameError, State);
+websocket_info({incoming, Packet}, State) ->
+    handle_incoming(Packet, State);
 
 websocket_info(Deliver = {deliver, _Topic, _Msg},
-               State = #state{chan_state = ChanState}) ->
+               State = #state{channel = Channel}) ->
     Delivers = emqx_misc:drain_deliver([Deliver]),
-    case emqx_channel:handle_out({deliver, Delivers}, ChanState) of
-        {ok, NChanState} ->
-            reply(State#state{chan_state = NChanState});
-        {ok, Packets, NChanState} ->
-            reply(enqueue(Packets, State#state{chan_state = NChanState}))
-    end;
+    Result = emqx_channel:handle_out(Delivers, Channel),
+    handle_return(Result, State);
 
 websocket_info({timeout, TRef, keepalive}, State) when is_reference(TRef) ->
     RecvOct = emqx_pd:get_counter(recv_oct),
@@ -264,60 +236,70 @@ websocket_info({timeout, TRef, emit_stats}, State) when is_reference(TRef) ->
 websocket_info({timeout, TRef, Msg}, State) when is_reference(TRef) ->
     handle_timeout(TRef, Msg, State);
 
+websocket_info({close, Reason}, State) ->
+    stop({shutdown, Reason}, State);
+
 websocket_info({shutdown, Reason}, State) ->
     stop({shutdown, Reason}, State);
 
 websocket_info({stop, Reason}, State) ->
     stop(Reason, State);
 
-websocket_info(Info, State = #state{chan_state = ChanState}) ->
-    case emqx_channel:handle_info(Info, ChanState) of
-        {ok, NChanState} ->
-            {ok, State#state{chan_state = NChanState}};
-        {stop, Reason, NChanState} ->
-            stop(Reason, State#state{chan_state = NChanState})
-    end.
+websocket_info(Info, State) ->
+    handle_info(Info, State).
 
-terminate(SockError, _Req, #state{chan_state  = ChanState,
+websocket_close(Reason, State) ->
+    ?LOG(debug, "WebSocket closed due to ~p~n", [Reason]),
+    handle_info({sock_closed, Reason}, State).
+
+terminate(SockError, _Req, #state{channel = Channel,
                                   stop_reason = Reason}) ->
     ?LOG(debug, "Terminated for ~p, sockerror: ~p", [Reason, SockError]),
-    emqx_channel:terminate(Reason, ChanState).
+    emqx_channel:terminate(Reason, Channel).
 
 %%--------------------------------------------------------------------
-%% Connected callback
+%% Handle call
 
-connected(State = #state{chan_state = ChanState}) ->
-    ChanAttrs = emqx_channel:attrs(ChanState),
-    SockAttrs = #{active_state => running},
-    Attrs = maps:merge(ChanAttrs, #{sockinfo => SockAttrs}),
-    ok = emqx_channel:handle_info({register, Attrs, stats(State)}, ChanState),
-    reply(State#state{conn_state = connected}).
+handle_call(From, info, State) ->
+    gen_server:reply(From, info(State)),
+    {ok, State};
 
-%%--------------------------------------------------------------------
-%% Close
+handle_call(From, stats, State) ->
+    gen_server:reply(From, stats(State)),
+    {ok, State};
 
-close(Reason, State) ->
-    ?LOG(warning, "Closed for ~p", [Reason]),
-    reply(State#state{conn_state = disconnected}).
+handle_call(From, Req, State = #state{channel = Channel}) ->
+    case emqx_channel:handle_call(Req, Channel) of
+        {reply, Reply, NChannel} ->
+            _ = gen_server:reply(From, Reply),
+            {ok, State#state{channel = NChannel}};
+        {stop, Reason, Reply, NChannel} ->
+            _ = gen_server:reply(From, Reply),
+            stop(Reason, State#state{channel = NChannel});
+        {stop, Reason, Reply, OutPacket, NChannel} ->
+            gen_server:reply(From, Reply),
+            NState = State#state{channel = NChannel},
+            stop(Reason, enqueue(OutPacket, NState))
+    end.
 
 %%--------------------------------------------------------------------
 %% Handle timeout
 
-handle_timeout(TRef, Msg, State = #state{chan_state = ChanState}) ->
-    case emqx_channel:handle_timeout(TRef, Msg, ChanState) of
-        {ok, NChanState} ->
-            {ok, State#state{chan_state = NChanState}};
-        {ok, Packets, NChanState} ->
-            NState = State#state{chan_state = NChanState},
-            reply(enqueue(Packets, NState));
-        {close, Reason, NChanState} ->
-            close(Reason, State#state{chan_state = NChanState});
-        {close, Reason, OutPackets, NChanState} ->
-            NState = State#state{chan_state= NChanState},
-            close(Reason, enqueue(OutPackets, NState));
-        {stop, Reason, NChanState} ->
-            stop(Reason, State#state{chan_state = NChanState})
-    end.
+handle_timeout(TRef, Msg, State = #state{channel = Channel}) ->
+    handle_return(emqx_channel:handle_timeout(TRef, Msg, Channel), State).
+
+%%--------------------------------------------------------------------
+%% Handle Info
+
+handle_info({enter, _}, State = #state{channel = Channel}) ->
+    ChanAttrs = emqx_channel:attrs(Channel),
+    SockAttrs = maps:from_list(info(?INFO_KEYS, State)),
+    Attrs = maps:merge(ChanAttrs, #{sockinfo => SockAttrs}),
+    ok = emqx_channel:handle_info({register, Attrs, stats(State)}, Channel),
+    reply(State);
+
+handle_info(Info, State = #state{channel = Channel}) ->
+    handle_return(emqx_channel:handle_info(Info, Channel), State).
 
 %%--------------------------------------------------------------------
 %% Process incoming data
@@ -343,48 +325,39 @@ process_incoming(Data, State = #state{parse_state = ParseState}) ->
 %%--------------------------------------------------------------------
 %% Handle incoming packets
 
-handle_incoming(Packet = ?PACKET(Type), SuccFun,
-                State = #state{chan_state = ChanState}) ->
+handle_incoming(Packet = ?PACKET(Type), State = #state{channel = Channel}) ->
     _ = inc_incoming_stats(Type),
     _ = emqx_metrics:inc_recv(Packet),
     ?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]),
-    case emqx_channel:handle_in(Packet, ChanState) of
-        {ok, NChanState} ->
-            SuccFun(State#state{chan_state= NChanState});
-        {ok, OutPackets, NChanState} ->
-            NState = State#state{chan_state= NChanState},
-            SuccFun(enqueue(OutPackets, NState));
-        {close, Reason, NChanState} ->
-            close(Reason, State#state{chan_state = NChanState});
-        {close, Reason, OutPackets, NChanState} ->
-            NState = State#state{chan_state= NChanState},
-            close(Reason, enqueue(OutPackets, NState));
-        {stop, Reason, NChanState} ->
-            stop(Reason, State#state{chan_state = NChanState});
-        {stop, Reason, OutPackets, NChanState} ->
-            NState = State#state{chan_state= NChanState},
-            stop(Reason, enqueue(OutPackets, NState))
-    end.
+    handle_return(emqx_channel:handle_in(Packet, Channel), State);
 
-handle_incoming(FrameError = {frame_error, _Reason},
-                State = #state{chan_state = ChanState}) ->
-    case emqx_channel:handle_in(FrameError, ChanState) of
-        {stop, Reason, NChanState} ->
-            stop(Reason, State#state{chan_state = NChanState});
-        {stop, Reason, OutPackets, NChanState} ->
-            NState = State#state{chan_state = NChanState},
-            stop(Reason, enqueue(OutPackets, NState))
-    end.
+handle_incoming(FrameError, State = #state{channel = Channel}) ->
+    handle_return(emqx_channel:handle_in(FrameError, Channel), State).
+
+%%--------------------------------------------------------------------
+%% Handle channel return
+
+handle_return(ok, State) ->
+    reply(State);
+handle_return({ok, NChannel}, State) ->
+    reply(State#state{channel= NChannel});
+handle_return({ok, Replies, NChannel}, State) ->
+    reply(Replies, State#state{channel= NChannel});
+handle_return({stop, Reason, NChannel}, State) ->
+    stop(Reason, State#state{channel = NChannel});
+handle_return({stop, Reason, OutPacket, NChannel}, State) ->
+    NState = State#state{channel = NChannel},
+    stop(Reason, enqueue(OutPacket, NState)).
 
 %%--------------------------------------------------------------------
 %% Handle outgoing packets
 
-handle_outgoing(Packets, State = #state{chan_state = ChanState}) ->
+handle_outgoing(Packets, State = #state{channel = Channel}) ->
     IoData = lists:map(serialize_and_inc_stats_fun(State), Packets),
     Oct = iolist_size(IoData),
     ok = inc_sent_stats(length(Packets), Oct),
-    NChanState = emqx_channel:sent(Oct, ChanState),
-    {{binary, IoData}, State#state{chan_state = NChanState}}.
+    {ok, NChannel} = emqx_channel:handle_out(Oct, Channel),
+    {{binary, IoData}, State#state{channel = NChannel}}.
 
 %% TODO: Duplicated with emqx_channel:serialize_and_inc_stats_fun/1
 serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
@@ -433,7 +406,25 @@ inc_sent_stats(Cnt, Oct) ->
 %%--------------------------------------------------------------------
 %% Reply or Stop
 
--compile({inline, [reply/1]}).
+reply(Packet, State) when is_record(Packet, mqtt_packet) ->
+    reply(enqueue(Packet, State));
+reply({outgoing, Packets}, State) ->
+    reply(enqueue(Packets, State));
+reply(Close = {close, _Reason}, State) ->
+    self() ! Close,
+    reply(State);
+
+reply([], State) ->
+    reply(State);
+reply([Packet|More], State) when is_record(Packet, mqtt_packet) ->
+    reply(More, enqueue(Packet, State));
+reply([{outgoing, Packets}|More], State) ->
+    reply(More, enqueue(Packets, State));
+reply([Other|More], State) ->
+    self() ! Other,
+    reply(More, State).
+
+-compile({inline, [reply/1, enqueue/2]}).
 
 reply(State = #state{pendings = []}) ->
     {ok, State};
@@ -441,6 +432,11 @@ reply(State = #state{pendings = Pendings}) ->
     {Reply, NState} = handle_outgoing(Pendings, State),
     {reply, Reply, NState#state{pendings = []}}.
 
+enqueue(Packet, State) when is_record(Packet, mqtt_packet) ->
+    enqueue([Packet], State);
+enqueue(Packets, State = #state{pendings = Pendings}) ->
+    State#state{pendings = lists:append(Pendings, Packets)}.
+
 stop(Reason, State = #state{pendings = []}) ->
     {stop, State#state{stop_reason = Reason}};
 stop(Reason, State = #state{pendings = Pendings}) ->
@@ -448,8 +444,3 @@ stop(Reason, State = #state{pendings = Pendings}) ->
     State2 = State1#state{pendings = [], stop_reason = Reason},
     {reply, [Reply, close], State2}.
 
-enqueue(Packet, State) when is_record(Packet, mqtt_packet) ->
-    enqueue([Packet], State);
-enqueue(Packets, State = #state{pendings = Pendings}) ->
-    State#state{pendings = lists:append(Pendings, Packets)}.
-

+ 17 - 11
test/emqx_channel_SUITE.erl

@@ -64,17 +64,18 @@ t_handle_connect(_) ->
                  is_bridge   = false,
                  clean_start = true,
                  keepalive   = 30,
-                 properties  = #{},
+                 properties  = undefined,
                  clientid    = <<"clientid">>,
                  username    = <<"username">>,
                  password    = <<"passwd">>
                 },
     with_channel(
       fun(Channel) ->
-              {ok, ?CONNACK_PACKET(?RC_SUCCESS), Channel1}
-                = handle_in(?CONNECT_PACKET(ConnPkt), Channel),
-              #{clientid := ClientId, username := Username}
-                = emqx_channel:info(clientinfo, Channel1),
+              ConnAck = ?CONNACK_PACKET(?RC_SUCCESS, 0, #{}),
+              ExpectedOutput = [{outgoing, ConnAck},{enter, connected}],
+              {ok, Output, Channel1} = handle_in(?CONNECT_PACKET(ConnPkt), Channel),
+              ?assertEqual(ExpectedOutput, Output),
+              #{clientid := ClientId, username := Username} = emqx_channel:info(clientinfo, Channel1),
               ?assertEqual(<<"clientid">>, ClientId),
               ?assertEqual(<<"username">>, Username)
       end).
@@ -180,7 +181,7 @@ t_handle_in_auth(_) ->
 %%--------------------------------------------------------------------
 
 t_handle_deliver(_) ->
-    with_channel(
+    with_connected_channel(
       fun(Channel) ->
               TopicFilters = [{<<"+">>, ?DEFAULT_SUBOPTS#{qos => ?QOS_2}}],
               {ok, ?SUBACK_PACKET(1, [?QOS_2]), Channel1}
@@ -188,7 +189,7 @@ t_handle_deliver(_) ->
               Msg0 = emqx_message:make(<<"clientx">>, ?QOS_0, <<"t0">>, <<"qos0">>),
               Msg1 = emqx_message:make(<<"clientx">>, ?QOS_1, <<"t1">>, <<"qos1">>),
               Delivers = [{deliver, <<"+">>, Msg0}, {deliver, <<"+">>, Msg1}],
-              {ok, Packets, _Ch} = emqx_channel:handle_out({deliver, Delivers}, Channel1),
+              {ok, {outgoing, Packets}, _Ch} = emqx_channel:handle_out(Delivers, Channel1),
               ?assertEqual([?QOS_0, ?QOS_1], [emqx_packet:qos(Pkt)|| Pkt <- Packets])
       end).
 
@@ -206,10 +207,9 @@ t_handle_out_connack(_) ->
                 },
     with_channel(
       fun(Channel) ->
-              {ok, ?CONNACK_PACKET(?RC_SUCCESS, SP, _), _}
+              {ok, [{outgoing, ?CONNACK_PACKET(?RC_SUCCESS, SP, _)}, {enter, connected}], _Chan}
                 = handle_out({connack, ?RC_SUCCESS, 0, ConnPkt}, Channel),
-              {stop, {shutdown, not_authorized},
-               ?CONNACK_PACKET(?RC_NOT_AUTHORIZED), _}
+              {stop, {shutdown, not_authorized}, ?CONNACK_PACKET(?RC_NOT_AUTHORIZED), _}
                 = handle_out({connack, ?RC_NOT_AUTHORIZED, ConnPkt}, Channel)
       end).
 
@@ -220,7 +220,7 @@ t_handle_out_publish(_) ->
               Pub1 = {publish, 1, emqx_message:make(<<"c">>, ?QOS_1, <<"t">>, <<"qos1">>)},
               {ok, ?PUBLISH_PACKET(?QOS_0), Channel} = handle_out(Pub0, Channel),
               {ok, ?PUBLISH_PACKET(?QOS_1), Channel} = handle_out(Pub1, Channel),
-              {ok, Packets, Channel1} = handle_out({publish, [Pub0, Pub1]}, Channel),
+              {ok, {outgoing, Packets}, Channel1} = handle_out({publish, [Pub0, Pub1]}, Channel),
               ?assertEqual(2, length(Packets)),
               ?assertEqual(#{publish_out => 2}, emqx_channel:info(pub_stats, Channel1))
       end).
@@ -304,6 +304,12 @@ t_terminate(_) ->
 %% Helper functions
 %%--------------------------------------------------------------------
 
+with_connected_channel(TestFun) ->
+    with_channel(
+      fun(Channel) ->
+          TestFun(emqx_channel:set_field(conn_state, connected, Channel))
+      end).
+
 with_channel(TestFun) ->
     with_channel(#{}, TestFun).
 

+ 1 - 0
test/emqx_pool_SUITE.erl

@@ -39,6 +39,7 @@ groups() ->
     ].
 
 init_per_suite(Config) ->
+    ok = emqx_logger:set_log_level(emergency),
     application:ensure_all_started(gproc),
     Config.