Bladeren bron

Merge branch 'connection-test-using-meck' into develop

Feng Lee 6 jaren geleden
bovenliggende
commit
fca7a7e761

+ 18 - 33
etc/emqx.conf

@@ -562,8 +562,8 @@ zone.external.hibernate_after = 60s
 ## Publish limit for the external MQTT connections.
 ##
 ## Value: Number,Duration
-## Example: 10 messages per minute.
-## zone.external.publish_limit = 10,1m
+## Example: 100 messages per 10 seconds.
+## zone.external.publish_limit = 100,10s
 
 ## Enable ACL check.
 ##
@@ -874,14 +874,11 @@ listener.tcp.external.active_n = 100
 ## Value: String
 listener.tcp.external.zone = external
 
-## Rate limit for the external MQTT/TCP connections. Format is 'rate,burst'.
+## Rate limit for the external MQTT/TCP connections. Format is 'limit,duration'.
 ##
-## Value: rate,burst
-##   - rate: The average limit value for per second
-##   - burst: The maximum allowed for each check, To avoid frequent restriction
-##            this value is recommended to be set to `(max_packet_size * active_n)/2`
-## Unit: Bps
-## listener.tcp.external.rate_limit = 1024,52428800
+## Value: limit,duration
+## Default: 100KB incoming per 10 seconds.
+## listener.tcp.external.rate_limit = 100KB,10s
 
 ## The access control rules for the MQTT/TCP listener.
 ##
@@ -1010,12 +1007,9 @@ listener.tcp.internal.zone = internal
 ##
 ## See: listener.tcp.$name.rate_limit
 ##
-## Value: rate,burst
-##   - rate: The average limit value for per second
-##   - burst: The maximum allowed for each check, To avoid frequent restriction
-##            this value is recommended to be set to `(max_packet_size * active_n)/2`
-## Unit: Bps
-## listener.tcp.internal.rate_limit = 1000000,524288000
+## Value: limit,duration
+## Default: 1MB incoming per second.
+## listener.tcp.internal.rate_limit = 1MB,1s
 
 ## The TCP backlog of internal MQTT/TCP Listener.
 ##
@@ -1123,12 +1117,9 @@ listener.ssl.external.access.1 = allow all
 
 ## Rate limit for the external MQTT/SSL connections.
 ##
-## Value: rate,burst
-##   - rate: The average limit value for per second
-##   - burst: The maximum allowed for each check, To avoid frequent restriction
-##            this value is recommended to be set to `(max_packet_size * active_n)/2`
-## Unit: Bps
-## listener.ssl.external.rate_limit = 1024,52428800
+## Value: limit,duration
+## Default: 100KB incoming per 10 seconds.
+## listener.ssl.external.rate_limit = 100KB,10s
 
 ## Enable the Proxy Protocol V1/2 if the EMQ cluster is deployed behind
 ## HAProxy or Nginx.
@@ -1360,12 +1351,9 @@ listener.ws.external.max_conn_rate = 1000
 
 ## Rate limit for the MQTT/WebSocket connections.
 ##
-## Value: rate,burst
-##   - rate: The average limit value for per second
-##   - burst: The maximum allowed for each check, To avoid frequent restriction
-##            this value is recommended to be set to `(max_packet_size * 1)/2`
-## Unit: Bps
-## listener.ws.external.rate_limit = 1024,524288
+## Value: limit,duration
+## Default: 100KB incoming per 10 seconds.
+## listener.ws.external.rate_limit = 100KB,10s
 
 ## Zone of the external MQTT/WebSocket listener belonged to.
 ##
@@ -1571,12 +1559,9 @@ listener.wss.external.max_conn_rate = 1000
 
 ## Rate limit for the MQTT/WebSocket/SSL connections.
 ##
-## Value: rate,burst
-##   - rate: The average limit value for per second
-##   - burst: The maximum allowed for each check, To avoid frequent restriction
-##            this value is recommended to be set to `(max_packet_size * 1)/2`
-## Unit: Bps
-## listener.wss.external.rate_limit = 1024,524288
+## Value: limit,duration
+## Default: 100KB incoming per 10 seconds.
+## listener.wss.external.rate_limit = 100KB,10s
 
 ## Zone of the external MQTT/WebSocket/SSL listener belonged to.
 ##

+ 22 - 12
priv/emqx.schema

@@ -939,14 +939,14 @@ end}.
                ("shared_subscription", Val) ->
                     {shared_subscription, Val};
                ("publish_limit", Val) ->
-                    [Limit, Duration] = string:tokens(Val, ", "),
-                    PubLimit = case cuttlefish_duration:parse(Duration, s) of
-                                   Secs when is_integer(Secs) ->
-                                       {list_to_integer(Limit) / Secs, list_to_integer(Limit)};
-                                   {error, Reason} ->
-                                       error(Reason)
+                    [L, D] = string:tokens(Val, ", "),
+                    Limit = list_to_integer(L),
+                    Duration = case cuttlefish_duration:parse(D, s) of
+                                   Secs when is_integer(Secs) -> Secs;
+                                   {error, Reason} -> error(Reason)
                                end,
-                    {publish_limit, PubLimit};
+                    Rate = Limit / Duration,
+                    {publish_limit, {Rate, Limit}};
                ("force_gc_policy", Val) ->
                     [Count, Bytes] = string:tokens(Val, "| "),
                     GcPolicy = case cuttlefish_bytesize:parse(Bytes) of
@@ -1644,10 +1644,20 @@ end}.
                   end
               end,
 
-    Ratelimit = fun(undefined) ->
-                    undefined;
-                   (S) ->
-                    list_to_tuple([list_to_integer(Token) || Token <- string:tokens(S, ",")])
+    RateLimit = fun(undefined) ->
+                        undefined;
+                   (Val) ->
+                        [L, D] = string:tokens(Val, ", "),
+                        Limit = case cuttlefish_bytesize:parse(L) of
+                                    Sz when is_integer(Sz) -> Sz;
+                                    {error, Reason} -> error(Reason)
+                                end,
+                        Duration = case cuttlefish_duration:parse(D, s) of
+                                       Secs when is_integer(Secs) -> Secs;
+                                       {error, Reason1} -> error(Reason1)
+                                   end,
+                        Rate = Limit / Duration,
+                        {Rate, Limit}
                 end,
 
     LisOpts = fun(Prefix) ->
@@ -1658,7 +1668,7 @@ end}.
                           {active_n, cuttlefish:conf_get(Prefix ++ ".active_n", Conf, undefined)},
                           {tune_buffer, cuttlefish:conf_get(Prefix ++ ".tune_buffer", Conf, undefined)},
                           {zone, Atom(cuttlefish:conf_get(Prefix ++ ".zone", Conf, undefined))},
-                          {rate_limit, Ratelimit(cuttlefish:conf_get(Prefix ++ ".rate_limit", Conf, undefined))},
+                          {rate_limit, RateLimit(cuttlefish:conf_get(Prefix ++ ".rate_limit", Conf, undefined))},
                           {proxy_protocol, cuttlefish:conf_get(Prefix ++ ".proxy_protocol", Conf, undefined)},
                           {proxy_protocol_timeout, cuttlefish:conf_get(Prefix ++ ".proxy_protocol_timeout", Conf, undefined)},
                           {verify_protocol_header, cuttlefish:conf_get(Prefix ++ ".verify_protocol_header", Conf, undefined)},

+ 1 - 0
src/emqx_broker.erl

@@ -486,3 +486,4 @@ code_change(_OldVsn, State, _Extra) ->
 %%------------------------------------------------------------------------------
 %% Internal functions
 %%------------------------------------------------------------------------------
+

+ 133 - 104
src/emqx_channel.erl

@@ -24,6 +24,11 @@
 
 -logger_header("[Channel]").
 
+-ifdef(TEST).
+-compile(export_all).
+-compile(nowarn_export_all).
+-endif.
+
 -export([ info/1
         , info/2
         , attrs/1
@@ -31,9 +36,6 @@
         , caps/1
         ]).
 
-%% Test Exports
--export([set_field/3]).
-
 -export([ init/2
         , handle_in/2
         , handle_out/2
@@ -43,6 +45,13 @@
         , terminate/2
         ]).
 
+-export([ recvd/2
+        , sent/2
+        ]).
+
+%% export for ct
+-export([set_field/3]).
+
 -import(emqx_misc,
         [ run_fold/3
         , pipeline/3
@@ -139,6 +148,8 @@ info(will_msg, #channel{will_msg = WillMsg}) ->
     emqx_message:to_map(WillMsg);
 info(pub_stats, #channel{pub_stats = PubStats}) ->
     PubStats;
+info(timers, #channel{timers = Timers}) ->
+    Timers;
 info(gc_state, #channel{gc_state = GcState}) ->
     maybe_apply(fun emqx_gc:info/1, GcState).
 
@@ -219,17 +230,17 @@ init_gc_state(Zone) ->
 %% Handle incoming packet
 %%--------------------------------------------------------------------
 
--spec(handle_in(Bytes :: pos_integer() | emqx_types:packet(), channel())
+-spec(recvd(pos_integer(), channel()) -> channel()).
+recvd(Bytes, Channel) ->
+    ensure_timer(stats_timer, maybe_gc_and_check_oom(Bytes, Channel)).
+
+-spec(handle_in(emqx_types:packet(), channel())
       -> {ok, channel()}
        | {ok, output(), channel()}
-       | {stop, Reason :: term(), channel()}
-       | {stop, Reason :: term(), output(), channel()}).
-handle_in(Bytes, Channel) when is_integer(Bytes) ->
-    NChannel = maybe_gc_and_check_oom(Bytes, Channel),
-    {ok, ensure_timer(stats_timer, NChannel)};
-
+       | {shutdown, Reason :: term(), channel()}
+       | {shutdown, Reason :: term(), output(), channel()}).
 handle_in(?CONNECT_PACKET(_), Channel = #channel{conn_state = connected}) ->
-     handle_out({disconnect, ?RC_PROTOCOL_ERROR}, Channel);
+     handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel);
 
 handle_in(?CONNECT_PACKET(ConnPkt), Channel) ->
     case pipeline([fun enrich_conninfo/2,
@@ -243,7 +254,7 @@ handle_in(?CONNECT_PACKET(ConnPkt), Channel) ->
             process_connect(NConnPkt, NChannel);
         {error, ReasonCode, NChannel} ->
             ReasonName = emqx_reason_codes:formalized(connack, ReasonCode),
-            handle_out({connack, ReasonName, ConnPkt}, NChannel)
+            handle_out(connack, {ReasonName, ConnPkt}, NChannel)
     end;
 
 handle_in(Packet = ?PUBLISH_PACKET(_QoS), Channel) ->
@@ -251,7 +262,7 @@ handle_in(Packet = ?PUBLISH_PACKET(_QoS), Channel) ->
     case emqx_packet:check(Packet) of
         ok -> handle_publish(Packet, NChannel);
         {error, ReasonCode} ->
-            handle_out({disconnect, ReasonCode}, NChannel)
+            handle_out(disconnect, ReasonCode, NChannel)
     end;
 
 handle_in(?PUBACK_PACKET(PacketId, _ReasonCode),
@@ -281,26 +292,27 @@ handle_in(?PUBREC_PACKET(PacketId, _ReasonCode),
         {ok, Msg, NSession} ->
             ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]),
             NChannel = Channel1#channel{session = NSession},
-            handle_out({pubrel, PacketId, ?RC_SUCCESS}, NChannel);
+            handle_out(pubrel, {PacketId, ?RC_SUCCESS}, NChannel);
         {error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} ->
             ?LOG(warning, "The PUBREC PacketId ~w is inuse.", [PacketId]),
             ok = emqx_metrics:inc('packets.pubrec.inuse'),
-            handle_out({pubrel, PacketId, RC}, Channel1);
+            handle_out(pubrel, {PacketId, RC}, Channel1);
         {error, RC = ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
             ?LOG(warning, "The PUBREC ~w is not found.", [PacketId]),
             ok = emqx_metrics:inc('packets.pubrec.missed'),
-            handle_out({pubrel, PacketId, RC}, Channel1)
+            handle_out(pubrel, {PacketId, RC}, Channel1)
     end;
 
 handle_in(?PUBREL_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Session}) ->
     Channel1 = inc_pub_stats(pubrel_in, Channel),
     case emqx_session:pubrel(PacketId, Session) of
         {ok, NSession} ->
-            handle_out({pubcomp, PacketId, ?RC_SUCCESS}, Channel1#channel{session = NSession});
+            Channel2 = Channel1#channel{session = NSession},
+            handle_out(pubcomp, {PacketId, ?RC_SUCCESS}, Channel2);
         {error, NotFound} ->
             ok = emqx_metrics:inc('packets.pubrel.missed'),
             ?LOG(warning, "The PUBREL PacketId ~w is not found", [PacketId]),
-            handle_out({pubcomp, PacketId, NotFound}, Channel1)
+            handle_out(pubcomp, {PacketId, NotFound}, Channel1)
     end;
 
 handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Session}) ->
@@ -324,9 +336,9 @@ handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
                                                   parse_topic_filters(TopicFilters)),
               TopicFilters2 = enrich_subid(Properties, TopicFilters1),
               {ReasonCodes, NChannel} = process_subscribe(TopicFilters2, Channel),
-              handle_out({suback, PacketId, ReasonCodes}, NChannel);
+              handle_out(suback, {PacketId, ReasonCodes}, NChannel);
         {error, ReasonCode} ->
-            handle_out({disconnect, ReasonCode}, Channel)
+            handle_out(disconnect, ReasonCode, Channel)
     end;
 
 handle_in(Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
@@ -336,9 +348,9 @@ handle_in(Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
                                                   [ClientInfo, Properties],
                                                   parse_topic_filters(TopicFilters)),
               {ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Channel),
-              handle_out({unsuback, PacketId, ReasonCodes}, NChannel);
+              handle_out(unsuback, {PacketId, ReasonCodes}, NChannel);
         {error, ReasonCode} ->
-            handle_out({disconnect, ReasonCode}, Channel)
+            handle_out(disconnect, ReasonCode, Channel)
     end;
 
 handle_in(?PACKET(?PINGREQ), Channel) ->
@@ -355,7 +367,7 @@ handle_in(?DISCONNECT_PACKET(ReasonCode, Properties), Channel = #channel{conninf
     Interval = emqx_mqtt_props:get('Session-Expiry-Interval', Properties, OldInterval),
     if
         OldInterval == 0 andalso Interval > OldInterval ->
-            handle_out({disconnect, ?RC_PROTOCOL_ERROR}, Channel1);
+            handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel1);
         Interval == 0 ->
             shutdown(ReasonName, Channel1);
         true ->
@@ -364,7 +376,7 @@ handle_in(?DISCONNECT_PACKET(ReasonCode, Properties), Channel = #channel{conninf
     end;
 
 handle_in(?AUTH_PACKET(), Channel) ->
-    handle_out({disconnect, ?RC_IMPLEMENTATION_SPECIFIC_ERROR}, Channel);
+    handle_out(disconnect, ?RC_IMPLEMENTATION_SPECIFIC_ERROR, Channel);
 
 handle_in({frame_error, Reason}, Channel = #channel{conn_state = idle}) ->
     shutdown(Reason, Channel);
@@ -373,7 +385,7 @@ handle_in({frame_error, Reason}, Channel = #channel{conn_state = connecting}) ->
     shutdown(Reason, ?CONNACK_PACKET(?RC_MALFORMED_PACKET), Channel);
 
 handle_in({frame_error, _Reason}, Channel = #channel{conn_state = connected}) ->
-    handle_out({disconnect, ?RC_MALFORMED_PACKET}, Channel);
+    handle_out(disconnect, ?RC_MALFORMED_PACKET, Channel);
 
 handle_in({frame_error, Reason}, Channel = #channel{conn_state = disconnected}) ->
     ?LOG(error, "Unexpected frame error: ~p", [Reason]),
@@ -381,7 +393,7 @@ handle_in({frame_error, Reason}, Channel = #channel{conn_state = disconnected})
 
 handle_in(Packet, Channel) ->
     ?LOG(error, "Unexpected incoming: ~p", [Packet]),
-    handle_out({disconnect, ?RC_PROTOCOL_ERROR}, Channel).
+    handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel).
 
 %%--------------------------------------------------------------------
 %% Process Connect
@@ -392,18 +404,18 @@ process_connect(ConnPkt = #mqtt_packet_connect{clean_start = CleanStart},
     case emqx_cm:open_session(CleanStart, ClientInfo, ConnInfo) of
         {ok, #{session := Session, present := false}} ->
             NChannel = Channel#channel{session = Session},
-            handle_out({connack, ?RC_SUCCESS, sp(false), ConnPkt}, NChannel);
+            handle_out(connack, {?RC_SUCCESS, sp(false), ConnPkt}, NChannel);
         {ok, #{session := Session, present := true, pendings := Pendings}} ->
             %%TODO: improve later.
             NPendings = lists:usort(lists:append(Pendings, emqx_misc:drain_deliver())),
             NChannel = Channel#channel{session  = Session,
                                        resuming = true,
                                        pendings = NPendings},
-            handle_out({connack, ?RC_SUCCESS, sp(true), ConnPkt}, NChannel);
+            handle_out(connack, {?RC_SUCCESS, sp(true), ConnPkt}, NChannel);
         {error, Reason} ->
             %% TODO: Unknown error?
             ?LOG(error, "Failed to open session: ~p", [Reason]),
-            handle_out({connack, ?RC_UNSPECIFIED_ERROR, ConnPkt}, Channel)
+            handle_out(connack, {?RC_UNSPECIFIED_ERROR, ConnPkt}, Channel)
     end.
 
 %%--------------------------------------------------------------------
@@ -426,7 +438,7 @@ handle_publish(Packet = ?PUBLISH_PACKET(_QoS, Topic, _PacketId),
         {error, ReasonCode, NChannel} ->
             ?LOG(warning, "Cannot publish message to ~s due to ~s",
                  [Topic, emqx_reason_codes:text(ReasonCode, ProtoVer)]),
-            handle_out({disconnect, ReasonCode}, NChannel)
+            handle_out(disconnect, ReasonCode, NChannel)
     end.
 
 process_publish(Packet = ?PUBLISH_PACKET(_QoS, _Topic, PacketId), Channel) ->
@@ -442,7 +454,7 @@ process_publish(PacketId, Msg = #message{qos = ?QOS_1}, Channel) ->
                      [] -> ?RC_NO_MATCHING_SUBSCRIBERS;
                      _  -> ?RC_SUCCESS
                  end,
-    handle_out({puback, PacketId, ReasonCode}, Channel);
+    handle_out(puback, {PacketId, ReasonCode}, Channel);
 
 process_publish(PacketId, Msg = #message{qos = ?QOS_2},
                 Channel = #channel{session = Session}) ->
@@ -453,14 +465,14 @@ process_publish(PacketId, Msg = #message{qos = ?QOS_2},
                      _  -> ?RC_SUCCESS
                  end,
             NChannel = Channel#channel{session = NSession},
-            handle_out({pubrec, PacketId, RC}, ensure_timer(await_timer, NChannel));
+            handle_out(pubrec, {PacketId, RC}, ensure_timer(await_timer, NChannel));
         {error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} ->
             ok = emqx_metrics:inc('packets.publish.inuse'),
-            handle_out({pubrec, PacketId, RC}, Channel);
+            handle_out(pubrec, {PacketId, RC}, Channel);
         {error, RC = ?RC_RECEIVE_MAXIMUM_EXCEEDED} ->
             ?LOG(warning, "Dropped qos2 packet ~w due to awaiting_rel is full", [PacketId]),
             ok = emqx_metrics:inc('messages.qos2.dropped'),
-            handle_out({pubrec, PacketId, RC}, Channel)
+            handle_out(pubrec, {PacketId, RC}, Channel)
     end.
 
 publish_to_msg(Packet, #channel{conninfo = #{proto_ver := ProtoVer},
@@ -528,15 +540,15 @@ do_unsubscribe(TopicFilter, _SubOpts, Channel =
 %% Handle outgoing packet
 %%--------------------------------------------------------------------
 
--spec(handle_out(integer()|term(), channel())
+-spec(sent(pos_integer(), channel()) -> channel()).
+sent(Bytes, Channel) ->
+    ensure_timer(stats_timer, maybe_gc_and_check_oom(Bytes, Channel)).
+
+-spec(handle_out(term(), channel())
       -> {ok, channel()}
        | {ok, output(), channel()}
-       | {stop, Reason :: term(), channel()}
-       | {stop, Reason :: term(), output(), channel()}).
-handle_out(Bytes, Channel) when is_integer(Bytes) ->
-    NChannel = maybe_gc_and_check_oom(Bytes, Channel),
-    {ok, ensure_timer(stats_timer, NChannel)};
-
+       | {shutdown, Reason :: term(), channel()}
+       | {shutdown, Reason :: term(), output(), channel()}).
 handle_out(Delivers, Channel = #channel{conn_state = disconnected,
                                         session = Session})
   when is_list(Delivers) ->
@@ -557,7 +569,36 @@ handle_out(Delivers, Channel = #channel{session = Session}) when is_list(Deliver
             {ok, Channel#channel{session = NSession}}
     end;
 
-handle_out({connack, ?RC_SUCCESS, SP, ConnPkt},
+handle_out({publish, Publishes}, Channel) when is_list(Publishes) ->
+    Packets = lists:foldl(
+                fun(Publish, Acc) ->
+                    case handle_out(Publish, Channel) of
+                        {ok, Packet, _Ch} ->
+                            [Packet|Acc];
+                        {ok, _Ch} -> Acc
+                    end
+                end, [], Publishes),
+    NChannel = inc_pub_stats(publish_out, length(Packets), Channel),
+    {ok, {outgoing, lists:reverse(Packets)}, NChannel};
+
+%% Ignore loop deliver
+handle_out({publish, _PacketId, #message{from  = ClientId,
+                                         flags = #{nl := true}}},
+           Channel = #channel{clientinfo = #{clientid := ClientId}}) ->
+    {ok, Channel};
+
+handle_out({publish, PacketId, Msg}, Channel =
+           #channel{clientinfo = ClientInfo = #{mountpoint := MountPoint}}) ->
+    Msg1 = emqx_message:update_expiry(Msg),
+    Msg2 = emqx_hooks:run_fold('message.delivered', [ClientInfo], Msg1),
+    Msg3 = emqx_mountpoint:unmount(MountPoint, Msg2),
+    {ok, emqx_message:to_packet(PacketId, Msg3), Channel};
+
+handle_out(Data, Channel) ->
+    ?LOG(error, "Unexpected outgoing: ~p", [Data]),
+    {ok, Channel}.
+
+handle_out(connack, {?RC_SUCCESS, SP, ConnPkt},
            Channel = #channel{conninfo   = ConnInfo,
                               clientinfo = ClientInfo}) ->
     AckProps = run_fold([fun enrich_caps/2,
@@ -574,16 +615,16 @@ handle_out({connack, ?RC_SUCCESS, SP, ConnPkt},
     AckPacket = ?CONNACK_PACKET(?RC_SUCCESS, SP, AckProps),
     case maybe_resume_session(Channel2) of
         ignore ->
-            {ok, [{enter, connected}, {outgoing, AckPacket}], Channel2};
+            {ok, [{connack, AckPacket}], Channel2};
         {ok, Publishes, NSession} ->
             Channel3 = Channel2#channel{session  = NSession,
                                         resuming = false,
                                         pendings = []},
             {ok, {outgoing, Packets}, _} = handle_out({publish, Publishes}, Channel3),
-            {ok, [{enter, connected}, {outgoing, [AckPacket|Packets]}], Channel3}
+            {ok, [{connack, AckPacket}, {outgoing, Packets}], Channel3}
     end;
 
-handle_out({connack, ReasonCode, _ConnPkt}, Channel = #channel{conninfo = ConnInfo,
+handle_out(connack, {ReasonCode, _ConnPkt}, Channel = #channel{conninfo = ConnInfo,
                                                                clientinfo = ClientInfo}) ->
     ok = emqx_hooks:run('client.connected', [ClientInfo, ReasonCode, ConnInfo]),
     ReasonCode1 = case ProtoVer = maps:get(proto_ver, ConnInfo) of
@@ -593,65 +634,40 @@ handle_out({connack, ReasonCode, _ConnPkt}, Channel = #channel{conninfo = ConnIn
     Reason = emqx_reason_codes:name(ReasonCode1, ProtoVer),
     shutdown(Reason, ?CONNACK_PACKET(ReasonCode1), Channel);
 
-handle_out({publish, Publishes}, Channel) when is_list(Publishes) ->
-    Packets = lists:foldl(
-                fun(Publish, Acc) ->
-                    case handle_out(Publish, Channel) of
-                        {ok, Packet, _Ch} ->
-                            [Packet|Acc];
-                        {ok, _Ch} -> Acc
-                    end
-                end, [], Publishes),
-    NChannel = inc_pub_stats(publish_out, length(Packets), Channel),
-    {ok, {outgoing, lists:reverse(Packets)}, NChannel};
-
-%% Ignore loop deliver
-handle_out({publish, _PacketId, #message{from  = ClientId,
-                                         flags = #{nl := true}}},
-           Channel = #channel{clientinfo = #{clientid := ClientId}}) ->
-    {ok, Channel};
-
-handle_out({publish, PacketId, Msg}, Channel =
-           #channel{clientinfo = ClientInfo = #{mountpoint := MountPoint}}) ->
-    Msg1 = emqx_message:update_expiry(Msg),
-    Msg2 = emqx_hooks:run_fold('message.delivered', [ClientInfo], Msg1),
-    Msg3 = emqx_mountpoint:unmount(MountPoint, Msg2),
-    {ok, emqx_message:to_packet(PacketId, Msg3), Channel};
-
-handle_out({puback, PacketId, ReasonCode}, Channel) ->
+handle_out(puback, {PacketId, ReasonCode}, Channel) ->
     {ok, ?PUBACK_PACKET(PacketId, ReasonCode), inc_pub_stats(puback_out, Channel)};
 
-handle_out({pubrel, PacketId, ReasonCode}, Channel) ->
-    {ok, ?PUBREL_PACKET(PacketId, ReasonCode), inc_pub_stats(pubrel_out, Channel)};
-
-handle_out({pubrec, PacketId, ReasonCode}, Channel) ->
+handle_out(pubrec, {PacketId, ReasonCode}, Channel) ->
     {ok, ?PUBREC_PACKET(PacketId, ReasonCode), inc_pub_stats(pubrec_out, Channel)};
 
-handle_out({pubcomp, PacketId, ReasonCode}, Channel) ->
+handle_out(pubrel, {PacketId, ReasonCode}, Channel) ->
+    {ok, ?PUBREL_PACKET(PacketId, ReasonCode), inc_pub_stats(pubrel_out, Channel)};
+
+handle_out(pubcomp, {PacketId, ReasonCode}, Channel) ->
     {ok, ?PUBCOMP_PACKET(PacketId, ReasonCode), inc_pub_stats(pubcomp_out, Channel)};
 
-handle_out({suback, PacketId, ReasonCodes},
+handle_out(suback, {PacketId, ReasonCodes},
            Channel = #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V5}}) ->
     {ok, ?SUBACK_PACKET(PacketId, ReasonCodes), Channel};
 
-handle_out({suback, PacketId, ReasonCodes}, Channel) ->
+handle_out(suback, {PacketId, ReasonCodes}, Channel) ->
     ReasonCodes1 = [emqx_reason_codes:compat(suback, RC) || RC <- ReasonCodes],
     {ok, ?SUBACK_PACKET(PacketId, ReasonCodes1), Channel};
 
-handle_out({unsuback, PacketId, ReasonCodes},
+handle_out(unsuback, {PacketId, ReasonCodes},
            Channel = #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V5}}) ->
     {ok, ?UNSUBACK_PACKET(PacketId, ReasonCodes), Channel};
 
-handle_out({unsuback, PacketId, _ReasonCodes}, Channel) ->
+handle_out(unsuback, {PacketId, _ReasonCodes}, Channel) ->
     {ok, ?UNSUBACK_PACKET(PacketId), Channel};
 
-handle_out({disconnect, ReasonCode}, Channel = #channel{conninfo = #{proto_ver := ProtoVer}}) ->
+handle_out(disconnect, ReasonCode, Channel = #channel{conninfo = #{proto_ver := ProtoVer}})
+  when is_integer(ReasonCode) ->
     ReasonName = emqx_reason_codes:name(ReasonCode, ProtoVer),
-    handle_out({disconnect, ReasonCode, ReasonName}, Channel);
+    handle_out(disconnect, {ReasonCode, ReasonName}, Channel);
 
-handle_out({disconnect, ReasonCode, ReasonName},
-           Channel = #channel{conninfo = #{proto_ver := ProtoVer,
-                                           expiry_interval := ExpiryInterval}}) ->
+handle_out(disconnect, {ReasonCode, ReasonName}, Channel = #channel{conninfo = ConnInfo}) ->
+    #{proto_ver := ProtoVer, expiry_interval := ExpiryInterval} = ConnInfo,
     case {ExpiryInterval, ProtoVer} of
         {0, ?MQTT_PROTO_V5} ->
             shutdown(ReasonName, ?DISCONNECT_PACKET(ReasonCode), Channel);
@@ -673,7 +689,7 @@ handle_out({disconnect, ReasonCode, ReasonName},
             {ok, {close, ReasonName}, NChannel}
     end;
 
-handle_out({Type, Data}, Channel) ->
+handle_out(Type, Data, Channel) ->
     ?LOG(error, "Unexpected outgoing: ~s, ~p", [Type, Data]),
     {ok, Channel}.
 
@@ -683,38 +699,40 @@ handle_out({Type, Data}, Channel) ->
 
 -spec(handle_call(Req :: term(), channel())
       -> {reply, Reply :: term(), channel()}
-       | {stop, Reason :: term(), Reply :: term(), channel()}).
+       | {shutdown, Reason :: term(), Reply :: term(), channel()}).
 handle_call(kick, Channel) ->
-    {stop, {shutdown, kicked}, ok, Channel};
+    shutdown(kicked, ok, Channel);
 
 handle_call(discard, Channel = #channel{conn_state = connected}) ->
     Packet = ?DISCONNECT_PACKET(?RC_SESSION_TAKEN_OVER),
-    {stop, {shutdown, discarded}, ok, Packet, Channel};
+    {shutdown, discarded, ok, Packet, Channel};
 
 handle_call(discard, Channel = #channel{conn_state = disconnected}) ->
-    {stop, {shutdown, discarded}, ok, Channel};
+    shutdown(discarded, ok, Channel);
 
 %% Session Takeover
 handle_call({takeover, 'begin'}, Channel = #channel{session = Session}) ->
-    {reply, Session, Channel#channel{takeover = true}};
+    reply(Session, Channel#channel{takeover = true});
 
 handle_call({takeover, 'end'}, Channel = #channel{session  = Session,
                                                   pendings = Pendings}) ->
     ok = emqx_session:takeover(Session),
+    %% TODO: Should not drain deliver here
     Delivers = emqx_misc:drain_deliver(),
     AllPendings = lists:append(Delivers, Pendings),
-    {stop, {shutdown, takeovered}, AllPendings, Channel};
+    shutdown(takeovered, AllPendings, Channel);
 
 handle_call(Req, Channel) ->
     ?LOG(error, "Unexpected call: ~p", [Req]),
-    {reply, ignored, Channel}.
+    reply(ignored, Channel).
 
 %%--------------------------------------------------------------------
 %% Handle Info
 %%--------------------------------------------------------------------
 
 -spec(handle_info(Info :: term(), channel())
-      -> ok | {ok, channel()} | {stop, Reason :: term(), channel()}).
+      -> ok | {ok, channel()}
+       | {shutdown, Reason :: term(), channel()}).
 handle_info({subscribe, TopicFilters}, Channel = #channel{clientinfo = ClientInfo}) ->
     TopicFilters1 = emqx_hooks:run_fold('client.subscribe',
                                         [ClientInfo, #{'Internal' => true}],
@@ -769,7 +787,7 @@ handle_info(Info, Channel) ->
 -spec(handle_timeout(reference(), Msg :: term(), channel())
       -> {ok, channel()}
        | {ok, Result :: term(), channel()}
-       | {stop, Reason :: term(), channel()}).
+       | {shutdown, Reason :: term(), channel()}).
 handle_timeout(TRef, {emit_stats, Stats},
                Channel = #channel{clientinfo = #{clientid := ClientId},
                                   timers = #{stats_timer := TRef}}) ->
@@ -784,7 +802,7 @@ handle_timeout(TRef, {keepalive, StatVal},
             NChannel = Channel#channel{keepalive = NKeepalive},
             {ok, reset_timer(alive_timer, NChannel)};
         {error, timeout} ->
-            handle_out({disconnect, ?RC_KEEP_ALIVE_TIMEOUT}, Channel)
+            handle_out(disconnect, ?RC_KEEP_ALIVE_TIMEOUT, Channel)
     end;
 
 handle_timeout(TRef, retry_delivery,
@@ -862,7 +880,7 @@ interval(alive_timer, #channel{keepalive = KeepAlive}) ->
 interval(retry_timer, #channel{session = Session}) ->
     emqx_session:info(retry_interval, Session);
 interval(await_timer, #channel{session = Session}) ->
-    emqx_session:info(await_rel_timeout, Session);
+    emqx_session:info(awaiting_rel_timeout, Session);
 interval(expire_timer, #channel{conninfo = ConnInfo}) ->
     timer:seconds(maps:get(expiry_interval, ConnInfo));
 interval(will_timer, #channel{will_msg = WillMsg}) ->
@@ -1050,13 +1068,13 @@ check_pub_acl(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = Topic}},
     end.
 
 %% Check Pub Alias
+%% TODO: Fixme later
 check_pub_alias(#mqtt_packet{
                    variable = #mqtt_packet_publish{
                                  properties = #{'Topic-Alias' := AliasId}
                                 }
                   },
                 #channel{alias_maximum = Limits}) ->
-    %% TODO: Move to Protocol
     case (Limits == undefined)
             orelse (Max = maps:get(inbound, Limits, 0)) == 0
                 orelse (AliasId > Max) of
@@ -1104,6 +1122,7 @@ enrich_subopts(SubOpts, #channel{clientinfo = #{zone := Zone, is_bridge := IsBri
     NL = flag(emqx_zone:ignore_loop_deliver(Zone)),
     SubOpts#{rap => flag(IsBridge), nl => NL}.
 
+%% TODO: Default caps should not be returned.
 enrich_caps(AckProps, #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V5},
                                clientinfo = #{zone := Zone}}) ->
     #{max_packet_size       := MaxPktSize,
@@ -1195,15 +1214,25 @@ maybe_gc_and_check_oom(Oct, Channel = #channel{clientinfo = #{zone := Zone},
 %% Helper functions
 %%--------------------------------------------------------------------
 
+-compile({inline, [reply/2]}).
+reply(Reply, Channel) ->
+    {reply, Reply, Channel}.
+
+-compile({inline, [shutdown/2]}).
+shutdown(success, Channel) ->
+    shutdown(normal, Channel);
+shutdown(Reason, Channel) ->
+    {shutdown, Reason, Channel}.
+
+-compile({inline, [shutdown/3]}).
+shutdown(success, Reply, Channel) ->
+    shutdown(normal, Reply, Channel);
+shutdown(Reason, Reply, Channel) ->
+    {shutdown, Reason, Reply, Channel}.
+
 sp(true)  -> 1;
 sp(false) -> 0.
 
 flag(true)  -> 1;
 flag(false) -> 0.
 
-shutdown(Reason, Channel) ->
-    {stop, {shutdown, Reason}, Channel}.
-
-shutdown(Reason, Packets, Channel) ->
-    {stop, {shutdown, Reason}, Packets, Channel}.
-

+ 79 - 65
src/emqx_connection.erl

@@ -24,6 +24,11 @@
 
 -logger_header("[MQTT]").
 
+-ifdef(TEST).
+-compile(export_all).
+-compile(nowarn_export_all).
+-endif.
+
 %% API
 -export([ start_link/3
         , stop/1
@@ -121,9 +126,7 @@ info(active_n, #state{active_n = ActiveN}) ->
 info(pub_limit, #state{pub_limit = PubLimit}) ->
     limit_info(PubLimit);
 info(rate_limit, #state{rate_limit = RateLimit}) ->
-    limit_info(RateLimit);
-info(channel, #state{channel = Channel}) ->
-    emqx_channel:info(Channel).
+    limit_info(RateLimit).
 
 limit_info(Limit) ->
     emqx_misc:maybe_apply(fun esockd_rate_limit:info/1, Limit).
@@ -158,17 +161,9 @@ init(Parent, Transport, RawSocket, Options) ->
     case Transport:wait(RawSocket) of
         {ok, Socket} ->
             do_init(Parent, Transport, Socket, Options);
-        {error, Reason} when Reason =:= enotconn;
-                             Reason =:= einval;
-                             Reason =:= closed ->
-            Transport:fast_close(RawSocket),
-            exit(normal);
-        {error, timeout} ->
-            Transport:fast_close(RawSocket),
-            exit({shutdown, ssl_upgrade_timeout});
         {error, Reason} ->
-            Transport:fast_close(RawSocket),
-            exit(Reason)
+            ok = Transport:fast_close(RawSocket),
+            exit_on_sock_error(Reason)
     end.
 
 do_init(Parent, Transport, Socket, Options) ->
@@ -209,15 +204,10 @@ do_init(Parent, Transport, Socket, Options) ->
                   },
     case activate_socket(State) of
         {ok, NState} ->
-            recvloop(NState, #{hibernate_after => HibAfterTimeout});
-        {error, Reason} when Reason =:= einval;
-                             Reason =:= enotconn;
-                             Reason =:= closed ->
-            Transport:fast_close(Socket),
-            exit(normal);
+            hibernate(NState, #{hibernate_after => HibAfterTimeout});
         {error, Reason} ->
-            Transport:fast_close(Socket),
-            erlang:exit({shutdown, Reason})
+            ok = Transport:fast_close(Socket),
+            exit_on_sock_error(Reason)
     end.
 
 -compile({inline, [init_limiter/1]}).
@@ -225,6 +215,15 @@ init_limiter(undefined) -> undefined;
 init_limiter({Rate, Burst}) ->
     esockd_rate_limit:new(Rate, Burst).
 
+exit_on_sock_error(Reason) when Reason =:= einval;
+                                Reason =:= enotconn;
+                                Reason =:= closed ->
+    erlang:exit(normal);
+exit_on_sock_error(timeout) ->
+    erlang:exit({shutdown, ssl_upgrade_timeout});
+exit_on_sock_error(Reason) ->
+    erlang:exit({shutdown, Reason}).
+
 %%--------------------------------------------------------------------
 %% Recv Loop
 
@@ -291,8 +290,8 @@ handle_msg({Inet, _Sock, Data}, State = #state{channel = Channel})
     Oct = iolist_size(Data),
     emqx_pd:update_counter(incoming_bytes, Oct),
     ok = emqx_metrics:inc('bytes.received', Oct),
-    {ok, NChannel} = emqx_channel:handle_in(Oct, Channel),
-    process_incoming(Data, State#state{channel = NChannel});
+    NChannel = emqx_channel:recvd(Oct, Channel),
+    parse_incoming(Data, State#state{channel = NChannel});
 
 handle_msg({incoming, Packet = ?CONNECT_PACKET(ConnPkt)},
            State = #state{idle_timer = IdleTimer}) ->
@@ -316,24 +315,16 @@ handle_msg({Closed, _Sock}, State)
 
 handle_msg({Passive, _Sock}, State)
   when Passive == tcp_passive; Passive == ssl_passive ->
-    %% Rate limit and activate socket here.
+    %% Rate limit here:)
     NState = ensure_rate_limit(State),
-    case activate_socket(NState) of
-        {ok, NState} -> {ok, NState};
-        {error, Reason} ->
-            {ok, {sock_error, Reason}, NState}
-    end;
+    handle_info(activate_socket, NState);
 
 %% Rate limit timer expired.
 handle_msg(activate_socket, State) ->
     NState = State#state{sockstate   = idle,
                          limit_timer = undefined
                         },
-    case activate_socket(NState) of
-        {ok, NState} -> {ok, NState};
-        {error, Reason} ->
-            {ok, {sock_error, Reason}, State}
-    end;
+    handle_info(activate_socket, NState);
 
 handle_msg(Deliver = {deliver, _Topic, _Msg},
            State = #state{channel = Channel}) ->
@@ -342,7 +333,8 @@ handle_msg(Deliver = {deliver, _Topic, _Msg},
     handle_return(Result, State);
 
 handle_msg({outgoing, Packets}, State) ->
-    {ok, handle_outgoing(Packets, State)};
+    NState = handle_outgoing(Packets, State),
+    {ok, NState};
 
 %% something sent
 handle_msg({inet_reply, _Sock, ok}, _State) ->
@@ -362,13 +354,10 @@ handle_msg(Msg, State) -> handle_info(Msg, State).
 %%--------------------------------------------------------------------
 %% Terminate
 
-terminate(Reason, #state{transport = Transport,
-                         socket    = Socket,
-                         sockstate = SockSt,
-                         channel   = Channel}) ->
-    ?LOG(debug, "Terminated for ~p", [Reason]),
-    SockSt =:= closed orelse Transport:fast_close(Socket),
+terminate(Reason, State = #state{channel = Channel}) ->
+    ?LOG(debug, "Terminated due to ~p", [Reason]),
     emqx_channel:terminate(Reason, Channel),
+    close_socket(State),
     exit(Reason).
 
 %%--------------------------------------------------------------------
@@ -399,19 +388,19 @@ handle_call(_From, Req, State = #state{channel = Channel}) ->
     case emqx_channel:handle_call(Req, Channel) of
         {reply, Reply, NChannel} ->
             {reply, Reply, State#state{channel = NChannel}};
-        {stop, Reason, Reply, NChannel} ->
-            {stop, Reason, Reply, State#state{channel = NChannel}};
-        {stop, Reason, Reply, OutPacket, NChannel} ->
+        {shutdown, Reason, Reply, NChannel} ->
+            shutdown(Reason, Reply, State#state{channel = NChannel});
+        {shutdown, Reason, Reply, OutPacket, NChannel} ->
             NState = State#state{channel = NChannel},
             NState1 = handle_outgoing(OutPacket, NState),
-            {stop, Reason, Reply, NState1}
+            shutdown(Reason, Reply, NState1)
     end.
 
 %%--------------------------------------------------------------------
 %% Handle timeout
 
 handle_timeout(TRef, idle_timeout, State = #state{idle_timer = TRef}) ->
-    stop(idle_timeout, State);
+    shutdown(idle_timeout, State);
 
 handle_timeout(TRef, emit_stats, State) ->
     handle_timeout(TRef, {emit_stats, stats(State)}, State);
@@ -422,23 +411,19 @@ handle_timeout(TRef, keepalive, State = #state{transport = Transport,
         {ok, [{recv_oct, RecvOct}]} ->
             handle_timeout(TRef, {keepalive, RecvOct}, State);
         {error, Reason} ->
-            handle_info({sockerr, Reason}, State)
+            handle_info({sock_error, Reason}, State)
     end;
 
 handle_timeout(TRef, Msg, State = #state{channel = Channel}) ->
     handle_return(emqx_channel:handle_timeout(TRef, Msg, Channel), State).
 
 %%--------------------------------------------------------------------
-%% Process/Parse incoming data.
-
--compile({inline, [process_incoming/2]}).
-process_incoming(Data, State) ->
-    {Packets, NState} = parse_incoming(Data, State),
-    {ok, next_incoming_msgs(Packets), NState}.
+%% Parse incoming data
 
 -compile({inline, [parse_incoming/2]}).
 parse_incoming(Data, State) ->
-    parse_incoming(Data, [], State).
+    {Packets, NState} = parse_incoming(Data, [], State),
+    {ok, next_incoming_msgs(Packets), NState}.
 
 parse_incoming(<<>>, Packets, State) ->
     {Packets, State};
@@ -483,12 +468,12 @@ handle_return({ok, NChannel}, State) ->
     {ok, State#state{channel = NChannel}};
 handle_return({ok, Replies, NChannel}, State) ->
     {ok, next_msgs(Replies), State#state{channel = NChannel}};
-handle_return({stop, Reason, NChannel}, State) ->
-    stop(Reason, State#state{channel = NChannel});
-handle_return({stop, Reason, OutPacket, NChannel}, State) ->
+handle_return({shutdown, Reason, NChannel}, State) ->
+    shutdown(Reason, State#state{channel = NChannel});
+handle_return({shutdown, Reason, OutPacket, NChannel}, State) ->
     NState = State#state{channel = NChannel},
     NState1 = handle_outgoing(OutPacket, NState),
-    stop(Reason, NState1).
+    shutdown(Reason, NState1).
 
 %%--------------------------------------------------------------------
 %% Handle outgoing packets
@@ -522,8 +507,7 @@ send(IoData, State = #state{transport = Transport,
     ok = emqx_metrics:inc('bytes.sent', Oct),
     case Transport:async_send(Socket, IoData) of
         ok ->
-            {ok, NChannel} = emqx_channel:handle_out(Oct, Channel),
-            State#state{channel = NChannel};
+            State#state{channel = emqx_channel:sent(Oct, Channel)};
         Error = {error, _Reason} ->
             %% Simulate an inet_reply to postpone handling the error
             self() ! {inet_reply, Socket, Error}, State
@@ -532,9 +516,20 @@ send(IoData, State = #state{transport = Transport,
 %%--------------------------------------------------------------------
 %% Handle Info
 
-handle_info({enter, _}, State = #state{active_n  = ActiveN,
-                                       sockstate = SockSt,
-                                       channel   = Channel}) ->
+handle_info({connack, ConnAck}, State = #state{active_n  = ActiveN,
+                                                sockstate = SockSt,
+                                                channel   = Channel}) ->
+    NState = handle_outgoing(ConnAck, State),
+    ChanAttrs = emqx_channel:attrs(Channel),
+    SockAttrs = #{active_n  => ActiveN,
+                  sockstate => SockSt
+                 },
+    Attrs = maps:merge(ChanAttrs, #{sockinfo => SockAttrs}),
+    handle_info({register, Attrs, stats(State)}, NState);
+
+handle_info({enter, disconnected}, State = #state{active_n  = ActiveN,
+                                                sockstate = SockSt,
+                                                channel   = Channel}) ->
     ChanAttrs = emqx_channel:attrs(Channel),
     SockAttrs = #{active_n  => ActiveN,
                   sockstate => SockSt
@@ -542,8 +537,16 @@ handle_info({enter, _}, State = #state{active_n  = ActiveN,
     Attrs = maps:merge(ChanAttrs, #{sockinfo => SockAttrs}),
     handle_info({register, Attrs, stats(State)}, State);
 
-handle_info({sockerr, _Reason}, #state{sockstate = closed}) -> ok;
-handle_info({sockerr, Reason}, State) ->
+handle_info(activate_socket, State) ->
+    case activate_socket(State) of
+        {ok, NState} -> {ok, NState};
+        {error, Reason} ->
+            handle_info({sock_error, Reason}, State)
+    end;
+
+%%TODO: this is not right
+handle_info({sock_error, _Reason}, #state{sockstate = closed}) -> ok;
+handle_info({sock_error, Reason}, State) ->
     ?LOG(debug, "Socket error: ~p", [Reason]),
     handle_info({sock_closed, Reason}, close_socket(State));
 
@@ -578,6 +581,8 @@ activate_socket(State = #state{transport = Transport,
 %%--------------------------------------------------------------------
 %% Close Socket
 
+close_socket(State = #state{sockstate = closed}) ->
+    State;
 close_socket(State = #state{transport = Transport, socket = Socket}) ->
     ok = Transport:fast_close(Socket),
     State#state{sockstate = closed}.
@@ -641,7 +646,16 @@ next_msgs(Action) when is_tuple(Action) ->
 next_msgs(Actions) when is_list(Actions) ->
     Actions.
 
+shutdown(Reason, State) ->
+    stop({shutdown, Reason}, State).
+
+shutdown(Reason, Reply, State) ->
+    stop({shutdown, Reason}, Reply, State).
+
 -compile({inline, [stop/2]}).
 stop(Reason, State) ->
     {stop, Reason, State}.
 
+stop(Reason, Reply, State) ->
+    {stop, Reason, Reply, State}.
+

+ 54 - 30
src/emqx_session.erl

@@ -50,6 +50,11 @@
 
 -logger_header("[Session]").
 
+-ifdef(TEST).
+-compile(export_all).
+-compile(nowarn_export_all).
+-endif.
+
 -export([init/2]).
 
 -export([ info/1
@@ -58,9 +63,6 @@
         , stats/1
         ]).
 
-%% Exports for unit tests
--export([set_field/3]).
-
 -export([ subscribe/4
         , unsubscribe/3
         ]).
@@ -84,6 +86,9 @@
 
 -export([expire/2]).
 
+%% export for ct
+-export([set_field/3]).
+
 -export_type([session/0]).
 
 -import(emqx_zone, [get_env/3]).
@@ -95,27 +100,25 @@
           max_subscriptions :: non_neg_integer(),
           %% Upgrade QoS?
           upgrade_qos :: boolean(),
-          %% Client <- Broker:
-          %% Inflight QoS1, QoS2 messages sent to the client but unacked.
+          %% Client <- Broker: QoS1/2 messages sent to the client but unacked.
           inflight :: emqx_inflight:inflight(),
-          %% All QoS1, QoS2 messages published to when client is disconnected.
-          %% QoS 1 and QoS 2 messages pending transmission to the Client.
+          %% All QoS1/2 messages published to when client is disconnected,
+          %% or QoS1/2 messages pending transmission to the Client.
           %%
-          %% Optionally, QoS 0 messages pending transmission to the Client.
+          %% Optionally, QoS0 messages pending transmission to the Client.
           mqueue :: emqx_mqueue:mqueue(),
           %% Next packet id of the session
           next_pkt_id = 1 :: emqx_types:packet_id(),
           %% Retry interval for redelivering QoS1/2 messages
           retry_interval :: timeout(),
-          %% Client -> Broker:
-          %% Inflight QoS2 messages received from client and waiting for pubrel.
+          %% Client -> Broker: QoS2 messages received from client and waiting for pubrel.
           awaiting_rel :: map(),
           %% Max Packets Awaiting PUBREL
           max_awaiting_rel :: non_neg_integer(),
           %% Awaiting PUBREL Timeout
-          await_rel_timeout :: timeout(),
-          %% Enqueue Count
-          enqueue_cnt :: non_neg_integer(),
+          awaiting_rel_timeout :: timeout(),
+          %% Deliver Stats
+          deliver_stats :: emqx_types:stats(),
           %% Created at
           created_at :: pos_integer()
          }).
@@ -126,11 +129,13 @@
 
 -define(DEFAULT_BATCH_N, 1000).
 
--define(ATTR_KEYS, [inflight_max,
+-define(ATTR_KEYS, [inflight_cnt,
+                    inflight_max,
+                    mqueue_len,
                     mqueue_max,
                     retry_interval,
                     awaiting_rel_max,
-                    await_rel_timeout,
+                    awaiting_rel_timeout,
                     created_at
                    ]).
 
@@ -146,7 +151,7 @@
                     next_pkt_id,
                     awaiting_rel,
                     awaiting_rel_max,
-                    await_rel_timeout,
+                    awaiting_rel_timeout,
                     created_at
                    ]).
 
@@ -163,7 +168,7 @@
                     ]).
 
 %%--------------------------------------------------------------------
-%% Init a session
+%% Init a Session
 %%--------------------------------------------------------------------
 
 %% @doc Init a session.
@@ -178,11 +183,11 @@ init(#{zone := Zone}, #{receive_maximum := MaxInflight}) ->
              retry_interval    = get_env(Zone, retry_interval, 0),
              awaiting_rel      = #{},
              max_awaiting_rel  = get_env(Zone, max_awaiting_rel, 100),
-             await_rel_timeout = get_env(Zone, await_rel_timeout, 3600*1000),
-             enqueue_cnt       = 0,
+             awaiting_rel_timeout = get_env(Zone, awaiting_rel_timeout, 3600*1000),
              created_at        = erlang:system_time(second)
             }.
 
+%% @private init mq
 init_mqueue(Zone) ->
     emqx_mqueue:init(#{max_len => get_env(Zone, max_mqueue_len, 1000),
                        store_qos0 => get_env(Zone, mqueue_store_qos0, true),
@@ -215,6 +220,8 @@ info(subscriptions_max, #session{max_subscriptions = MaxSubs}) ->
 info(upgrade_qos, #session{upgrade_qos = UpgradeQoS}) ->
     UpgradeQoS;
 info(inflight, #session{inflight = Inflight}) ->
+    Inflight;
+info(inflight_cnt, #session{inflight = Inflight}) ->
     emqx_inflight:size(Inflight);
 info(inflight_max, #session{inflight = Inflight}) ->
     emqx_inflight:max_size(Inflight);
@@ -229,13 +236,19 @@ info(mqueue_dropped, #session{mqueue = MQueue}) ->
 info(next_pkt_id, #session{next_pkt_id = PacketId}) ->
     PacketId;
 info(awaiting_rel, #session{awaiting_rel = AwaitingRel}) ->
+    AwaitingRel;
+info(awaiting_rel_cnt, #session{awaiting_rel = AwaitingRel}) ->
     maps:size(AwaitingRel);
 info(awaiting_rel_max, #session{max_awaiting_rel = MaxAwaitingRel}) ->
     MaxAwaitingRel;
-info(await_rel_timeout, #session{await_rel_timeout = Timeout}) ->
+info(awaiting_rel_timeout, #session{awaiting_rel_timeout = Timeout}) ->
     Timeout;
-info(enqueue_cnt, #session{enqueue_cnt = Cnt}) ->
-    Cnt;
+info(enqueue_cnt, #session{deliver_stats = undefined}) ->
+    0;
+info(enqueue_cnt, #session{deliver_stats = Stats}) ->
+    maps:get(enqueue_cnt, Stats, 0);
+info(deliver_stats, #session{deliver_stats = Stats}) ->
+    Stats;
 info(created_at, #session{created_at = CreatedAt}) ->
     CreatedAt.
 
@@ -329,8 +342,7 @@ unsubscribe(ClientInfo, TopicFilter, Session = #session{subscriptions = Subs}) -
 %%--------------------------------------------------------------------
 
 -spec(publish(emqx_types:packet_id(), emqx_types:message(), session())
-      -> {ok, emqx_types:publish_result()} |
-         {ok, emqx_types:publish_result(), session()} |
+      -> {ok, emqx_types:publish_result(), session()} |
          {error, emqx_types:reason_code()}).
 publish(PacketId, Msg = #message{qos = ?QOS_2}, Session) ->
     case is_awaiting_full(Session) of
@@ -341,8 +353,8 @@ publish(PacketId, Msg = #message{qos = ?QOS_2}, Session) ->
     end;
 
 %% Publish QoS0/1 directly
-publish(_PacketId, Msg, _Session) ->
-    {ok, emqx_broker:publish(Msg)}.
+publish(_PacketId, Msg, Session) ->
+    {ok, emqx_broker:publish(Msg), Session}.
 
 is_awaiting_full(#session{max_awaiting_rel = 0}) ->
     false;
@@ -501,7 +513,7 @@ enqueue(Delivers, Session = #session{subscriptions = Subs}) when is_list(Deliver
             || {deliver, Topic, Msg} <- Delivers],
     lists:foldl(fun enqueue/2, Session, Msgs);
 
-enqueue(Msg, Session = #session{mqueue = Q, enqueue_cnt = Cnt})
+enqueue(Msg, Session = #session{mqueue = Q})
   when is_record(Msg, message) ->
     {Dropped, NewQ} = emqx_mqueue:in(Msg, Q),
     if is_record(Dropped, message) ->
@@ -509,7 +521,7 @@ enqueue(Msg, Session = #session{mqueue = Q, enqueue_cnt = Cnt})
                 [emqx_message:format(Dropped)]);
        true -> ok
     end,
-    Session#session{mqueue = NewQ, enqueue_cnt = Cnt+1}.
+    inc_deliver_stats(enqueue_cnt, Session#session{mqueue = NewQ}).
 
 %%--------------------------------------------------------------------
 %% Awaiting ACK for QoS1/QoS2 Messages
@@ -612,11 +624,11 @@ expire_awaiting_rel([], _Now, Session) ->
 
 expire_awaiting_rel([{PacketId, Ts} | More], Now,
                     Session = #session{awaiting_rel = AwaitingRel,
-                                       await_rel_timeout = Timeout}) ->
+                                       awaiting_rel_timeout = Timeout}) ->
     case (timer:now_diff(Now, Ts) div 1000) of
         Age when Age >= Timeout ->
             ok = emqx_metrics:inc('messages.qos2.expired'),
-            ?LOG(warning, "Dropped qos2 packet ~s for await_rel_timeout", [PacketId]),
+            ?LOG(warning, "Dropped qos2 packet ~s for awaiting_rel_timeout", [PacketId]),
             Session1 = Session#session{awaiting_rel = maps:remove(PacketId, AwaitingRel)},
             expire_awaiting_rel(More, Now, Session1);
         Age ->
@@ -633,3 +645,15 @@ next_pkt_id(Session = #session{next_pkt_id = 16#FFFF}) ->
 next_pkt_id(Session = #session{next_pkt_id = Id}) ->
     Session#session{next_pkt_id = Id + 1}.
 
+%%--------------------------------------------------------------------
+%% Helper functions
+%%--------------------------------------------------------------------
+
+inc_deliver_stats(Key, Session) ->
+    inc_deliver_stats(Key, 1, Session).
+inc_deliver_stats(Key, I, Session = #session{deliver_stats = undefined}) ->
+    Session#session{deliver_stats = #{Key => I}};
+inc_deliver_stats(Key, I, Session = #session{deliver_stats = Stats}) ->
+    NStats = maps:update_with(Key, fun(V) -> V+I end, I, Stats),
+    Session#session{deliver_stats = NStats}.
+

+ 31 - 17
src/emqx_ws_connection.erl

@@ -24,6 +24,11 @@
 
 -logger_header("[MQTT/WS]").
 
+-ifdef(TEST).
+-compile(export_all).
+-compile(nowarn_export_all).
+-endif.
+
 %% API
 -export([ info/1
         , stats/1
@@ -88,7 +93,9 @@ info(sockname, #state{sockname = Sockname}) ->
 info(sockstate, #state{sockstate = SockSt}) ->
     SockSt;
 info(channel, #state{channel = Channel}) ->
-    emqx_channel:info(Channel).
+    emqx_channel:info(Channel);
+info(stop_reason, #state{stop_reason = Reason}) ->
+    Reason.
 
 -spec(stats(pid()|state()) -> emqx_types:stats()).
 stats(WsPid) when is_pid(WsPid) ->
@@ -188,8 +195,8 @@ websocket_handle({binary, Data}, State = #state{channel = Channel}) ->
     ?LOG(debug, "RECV ~p", [Data]),
     Oct = iolist_size(Data),
     ok = inc_recv_stats(1, Oct),
-    {ok, NChannel} = emqx_channel:handle_in(Oct, Channel),
-    process_incoming(Data, State#state{channel = NChannel});
+    {ok, NChannel} = emqx_channel:recvd(Oct, Channel),
+    parse_incoming(Data, State#state{channel = NChannel});
 
 %% Pings should be replied with pongs, cowboy does it automatically
 %% Pongs can be safely ignored. Clause here simply prevents crash.
@@ -282,16 +289,17 @@ handle_call(From, Req, State = #state{channel = Channel}) ->
             stop(Reason, enqueue(OutPacket, NState))
     end.
 
-%%--------------------------------------------------------------------
-%% Handle timeout
-
-handle_timeout(TRef, Msg, State = #state{channel = Channel}) ->
-    handle_return(emqx_channel:handle_timeout(TRef, Msg, Channel), State).
-
 %%--------------------------------------------------------------------
 %% Handle Info
 
-handle_info({enter, _}, State = #state{channel = Channel}) ->
+handle_info({connack, ConnAck}, State = #state{channel = Channel}) ->
+    ChanAttrs = emqx_channel:attrs(Channel),
+    SockAttrs = maps:from_list(info(?INFO_KEYS, State)),
+    Attrs = maps:merge(ChanAttrs, #{sockinfo => SockAttrs}),
+    ok = emqx_channel:handle_info({register, Attrs, stats(State)}, Channel),
+    reply(enqueue(ConnAck, State));
+
+handle_info({enter, disconnected}, State = #state{channel = Channel}) ->
     ChanAttrs = emqx_channel:attrs(Channel),
     SockAttrs = maps:from_list(info(?INFO_KEYS, State)),
     Attrs = maps:merge(ChanAttrs, #{sockinfo => SockAttrs}),
@@ -302,18 +310,24 @@ handle_info(Info, State = #state{channel = Channel}) ->
     handle_return(emqx_channel:handle_info(Info, Channel), State).
 
 %%--------------------------------------------------------------------
-%% Process incoming data
+%% Handle timeout
+
+handle_timeout(TRef, Msg, State = #state{channel = Channel}) ->
+    handle_return(emqx_channel:handle_timeout(TRef, Msg, Channel), State).
+
+%%--------------------------------------------------------------------
+%% Parse incoming data
 
-process_incoming(<<>>, State) ->
+parse_incoming(<<>>, State) ->
     {ok, State};
 
-process_incoming(Data, State = #state{parse_state = ParseState}) ->
+parse_incoming(Data, State = #state{parse_state = ParseState}) ->
     try emqx_frame:parse(Data, ParseState) of
         {more, NParseState} ->
             {ok, State#state{parse_state = NParseState}};
         {ok, Packet, Rest, NParseState} ->
             self() ! {incoming, Packet},
-            process_incoming(Rest, State#state{parse_state = NParseState})
+            parse_incoming(Rest, State#state{parse_state = NParseState})
     catch
         error:Reason:Stk ->
             ?LOG(error, "~nParse failed for ~p~nStacktrace: ~p~nFrame data: ~p",
@@ -343,9 +357,9 @@ handle_return({ok, NChannel}, State) ->
     reply(State#state{channel= NChannel});
 handle_return({ok, Replies, NChannel}, State) ->
     reply(Replies, State#state{channel= NChannel});
-handle_return({stop, Reason, NChannel}, State) ->
+handle_return({shutdown, Reason, NChannel}, State) ->
     stop(Reason, State#state{channel = NChannel});
-handle_return({stop, Reason, OutPacket, NChannel}, State) ->
+handle_return({shutdown, Reason, OutPacket, NChannel}, State) ->
     NState = State#state{channel = NChannel},
     stop(Reason, enqueue(OutPacket, NState)).
 
@@ -356,7 +370,7 @@ handle_outgoing(Packets, State = #state{channel = Channel}) ->
     IoData = lists:map(serialize_and_inc_stats_fun(State), Packets),
     Oct = iolist_size(IoData),
     ok = inc_sent_stats(length(Packets), Oct),
-    {ok, NChannel} = emqx_channel:handle_out(Oct, Channel),
+    NChannel = emqx_channel:sent(Oct, Channel),
     {{binary, IoData}, State#state{channel = NChannel}}.
 
 %% TODO: Duplicated with emqx_channel:serialize_and_inc_stats_fun/1

+ 526 - 239
test/emqx_channel_SUITE.erl

@@ -19,11 +19,6 @@
 -compile(export_all).
 -compile(nowarn_export_all).
 
--import(emqx_channel,
-        [ handle_in/2
-        , handle_out/2
-        ]).
-
 -include("emqx.hrl").
 -include("emqx_mqtt.hrl").
 -include_lib("eunit/include/eunit.hrl").
@@ -45,288 +40,580 @@
 
 all() -> emqx_ct:all(?MODULE).
 
+%%--------------------------------------------------------------------
+%% CT Callbacks
+%%--------------------------------------------------------------------
+
 init_per_suite(Config) ->
-    emqx_ct_helpers:boot_modules([router, broker]),
-    emqx_ct_helpers:start_apps([]),
     Config.
 
 end_per_suite(_Config) ->
-    emqx_ct_helpers:stop_apps([]).
+    ok.
+
+init_per_testcase(_TestCase, Config) ->
+    %% CM Meck
+    ok = meck:new(emqx_cm, [passthrough, no_history]),
+    %% Access Control Meck
+    ok = meck:new(emqx_access_control, [passthrough, no_history]),
+    ok = meck:expect(emqx_access_control, authenticate,
+                     fun(_) -> {ok, #{auth_result => success}} end),
+    ok = meck:expect(emqx_access_control, check_acl, fun(_, _, _) -> allow end),
+    %% Broker Meck
+    ok = meck:new(emqx_broker, [passthrough, no_history]),
+    %% Hooks Meck
+    ok = meck:new(emqx_hooks, [passthrough, no_history]),
+    ok = meck:expect(emqx_hooks, run, fun(_Hook, _Args) -> ok end),
+    ok = meck:expect(emqx_hooks, run_fold, fun(_Hook, _Args, Acc) -> Acc end),
+    %% Session Meck
+    ok = meck:new(emqx_session, [passthrough, no_history]),
+    %% Metrics
+    ok = meck:new(emqx_metrics, [passthrough, no_history]),
+    ok = meck:expect(emqx_metrics, inc, fun(_) -> ok end),
+    ok = meck:expect(emqx_metrics, inc, fun(_, _) -> ok end),
+    Config.
+
+end_per_testcase(_TestCase, Config) ->
+    ok = meck:unload(emqx_access_control),
+    ok = meck:unload(emqx_metrics),
+    ok = meck:unload(emqx_session),
+    ok = meck:unload(emqx_broker),
+    ok = meck:unload(emqx_hooks),
+    ok = meck:unload(emqx_cm),
+    Config.
 
 %%--------------------------------------------------------------------
-%% Test cases for handle_in
+%% Test cases for channel info/stats/caps
 %%--------------------------------------------------------------------
 
-t_handle_connect(_) ->
-    ConnPkt = #mqtt_packet_connect{
-                 proto_name  = <<"MQTT">>,
-                 proto_ver   = ?MQTT_PROTO_V4,
-                 is_bridge   = false,
-                 clean_start = true,
-                 keepalive   = 30,
-                 properties  = undefined,
-                 clientid    = <<"clientid">>,
-                 username    = <<"username">>,
-                 password    = <<"passwd">>
-                },
-    with_channel(
-      fun(Channel) ->
-              ConnAck = ?CONNACK_PACKET(?RC_SUCCESS, 0, #{}),
-              ExpectedOutput = [{enter, connected},{outgoing, ConnAck}],
-              {ok, Output, Channel1} = handle_in(?CONNECT_PACKET(ConnPkt), Channel),
-              ?assertEqual(ExpectedOutput, Output),
-              #{clientid := ClientId, username := Username} = emqx_channel:info(clientinfo, Channel1),
-              ?assertEqual(<<"clientid">>, ClientId),
-              ?assertEqual(<<"username">>, Username)
-      end).
-
-t_handle_in_publish_qos0(_) ->
-    with_channel(
-      fun(Channel) ->
-              Publish = ?PUBLISH_PACKET(?QOS_0, <<"topic">>, undefined, <<"payload">>),
-              {ok, Channel1} = handle_in(Publish, Channel),
-              ?assertEqual(#{publish_in => 1}, emqx_channel:info(pub_stats, Channel1))
-      end).
-
-t_handle_in_publish_qos1(_) ->
-    with_channel(
-      fun(Channel) ->
-              Publish = ?PUBLISH_PACKET(?QOS_1, <<"topic">>, 1, <<"payload">>),
-              {ok, ?PUBACK_PACKET(1, RC), _} = handle_in(Publish, Channel),
-              ?assert((RC == ?RC_SUCCESS) orelse (RC == ?RC_NO_MATCHING_SUBSCRIBERS))
-      end).
-
-t_handle_publish_qos2(_) ->
-    with_channel(
-      fun(Channel) ->
-              Publish1 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 1, <<"payload">>),
-              {ok, ?PUBREC_PACKET(1, RC), Channel1} = handle_in(Publish1, Channel),
-              Publish2 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 2, <<"payload">>),
-              {ok, ?PUBREC_PACKET(2, RC), Channel2} = handle_in(Publish2, Channel1),
-              ?assert((RC == ?RC_SUCCESS) orelse (RC == ?RC_NO_MATCHING_SUBSCRIBERS)),
-              #{awaiting_rel := AwaitingRel} = emqx_channel:info(session, Channel2),
-              ?assertEqual(2, AwaitingRel)
-      end).
-
-t_handle_in_puback(_) ->
-    with_channel(
-      fun(Channel) ->
-              {ok, Channel1} = handle_in(?PUBACK_PACKET(1, ?RC_SUCCESS), Channel),
-              ?assertEqual(#{puback_in => 1}, emqx_channel:info(pub_stats, Channel1))
-      end).
-
-t_handle_in_pubrec(_) ->
-    with_channel(
-      fun(Channel) ->
-              {ok, ?PUBREL_PACKET(1, ?RC_PACKET_IDENTIFIER_NOT_FOUND), Channel1}
-                = handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), Channel),
-              ?assertEqual(#{pubrec_in => 1, pubrel_out => 1}, emqx_channel:info(pub_stats, Channel1))
-      end).
-
-t_handle_in_pubrel(_) ->
-    with_channel(
-      fun(Channel) ->
-              {ok, ?PUBCOMP_PACKET(1, ?RC_PACKET_IDENTIFIER_NOT_FOUND), Channel1}
-                = handle_in(?PUBREL_PACKET(1, ?RC_SUCCESS), Channel),
-              ?assertEqual(#{pubrel_in => 1, pubcomp_out => 1}, emqx_channel:info(pub_stats, Channel1))
-      end).
-
-t_handle_in_pubcomp(_) ->
-    with_channel(
-      fun(Channel) ->
-              {ok, Channel1} = handle_in(?PUBCOMP_PACKET(1, ?RC_SUCCESS), Channel),
-              ?assertEqual(#{pubcomp_in => 1}, emqx_channel:info(pub_stats, Channel1))
-      end).
-
-t_handle_subscribe(_) ->
-    with_channel(
-      fun(Channel) ->
-              TopicFilters = [{<<"+">>, ?DEFAULT_SUBOPTS}],
-              {ok, ?SUBACK_PACKET(10, [?QOS_0]), Channel1}
-                = handle_in(?SUBSCRIBE_PACKET(10, #{}, TopicFilters), Channel),
-              #{subscriptions := Subscriptions}
-                = emqx_channel:info(session, Channel1),
-              ?assertEqual(maps:from_list(TopicFilters), Subscriptions)
-      end).
-
-t_handle_unsubscribe(_) ->
-    with_channel(
-      fun(Channel) ->
-              {ok, ?UNSUBACK_PACKET(11), Channel}
-                = handle_in(?UNSUBSCRIBE_PACKET(11, #{}, [<<"+">>]), Channel)
-      end).
-
-t_handle_pingreq(_) ->
-    with_channel(
-      fun(Channel) ->
-          {ok, ?PACKET(?PINGRESP), Channel} = handle_in(?PACKET(?PINGREQ), Channel)
-      end).
-
-t_handle_disconnect(_) ->
-    with_channel(
-      fun(Channel) ->
-              {stop, {shutdown, normal}, Channel1} = handle_in(?DISCONNECT_PACKET(?RC_SUCCESS), Channel),
-              ?assertEqual(undefined, emqx_channel:info(will_msg, Channel1))
-      end).
+t_chan_info(_) ->
+    #{conn_state := connected,
+      clientinfo := ClientInfo
+     } = emqx_channel:info(channel()),
+    ?assertEqual(clientinfo(), ClientInfo).
 
-t_handle_in_auth(_) ->
-    with_channel(
-      fun(Channel) ->
-              Packet = ?DISCONNECT_PACKET(?RC_IMPLEMENTATION_SPECIFIC_ERROR),
-              {stop, {shutdown, implementation_specific_error}, Packet, Channel} = handle_in(?AUTH_PACKET(), Channel)
-      end).
+t_chan_attrs(_) ->
+    #{conn_state := connected} = emqx_channel:attrs(channel()).
+
+t_chan_stats(_) ->
+    [] = emqx_channel:stats(channel()).
+
+t_chan_caps(_) ->
+    Caps = emqx_channel:caps(channel()).
+
+t_chan_recvd(_) ->
+    _Channel = emqx_channel:recvd(10, channel()).
+
+t_chan_sent(_) ->
+    _Channel = emqx_channel:sent(10, channel()).
+
+%%--------------------------------------------------------------------
+%% Test cases for channel init
+%%--------------------------------------------------------------------
+
+%% TODO:
+t_chan_init(_) ->
+    Channel = channel().
 
 %%--------------------------------------------------------------------
-%% Test cases for handle_deliver
+%% Test cases for channel handle_in
 %%--------------------------------------------------------------------
 
-t_handle_deliver(_) ->
-    with_connected_channel(
-      fun(Channel) ->
-              TopicFilters = [{<<"+">>, ?DEFAULT_SUBOPTS#{qos => ?QOS_2}}],
-              {ok, ?SUBACK_PACKET(1, [?QOS_2]), Channel1}
-                = handle_in(?SUBSCRIBE_PACKET(1, #{}, TopicFilters), Channel),
-              Msg0 = emqx_message:make(<<"clientx">>, ?QOS_0, <<"t0">>, <<"qos0">>),
-              Msg1 = emqx_message:make(<<"clientx">>, ?QOS_1, <<"t1">>, <<"qos1">>),
-              Delivers = [{deliver, <<"+">>, Msg0}, {deliver, <<"+">>, Msg1}],
-              {ok, {outgoing, Packets}, _Ch} = emqx_channel:handle_out(Delivers, Channel1),
-              ?assertEqual([?QOS_0, ?QOS_1], [emqx_packet:qos(Pkt)|| Pkt <- Packets])
-      end).
+t_handle_in_connect_packet_sucess(_) ->
+    ok = meck:expect(emqx_cm, open_session,
+                     fun(true, _ClientInfo, _ConnInfo) ->
+                             {ok, #{session => session(), present => false}}
+                     end),
+    {ok, [{connack, ?CONNACK_PACKET(?RC_SUCCESS, 0)}], Channel}
+        = emqx_channel:handle_in(?CONNECT_PACKET(connpkt()), channel(#{conn_state => idle})),
+    ClientInfo = emqx_channel:info(clientinfo, Channel),
+    ?assertMatch(#{clientid := <<"clientid">>,
+                   username := <<"username">>
+                  }, ClientInfo),
+    ?assertEqual(connected, emqx_channel:info(conn_state, Channel)).
+
+t_handle_in_unexpected_connect_packet(_) ->
+    Channel = emqx_channel:set_field(conn_state, connected, channel()),
+    {shutdown, protocol_error, ?DISCONNECT_PACKET(?RC_PROTOCOL_ERROR), Channel}
+        = emqx_channel:handle_in(?CONNECT_PACKET(connpkt()), Channel).
+
+t_handle_in_qos0_publish(_) ->
+    ok = meck:expect(emqx_broker, publish, fun(_) -> ok end),
+    Channel = channel(#{conn_state => connected}),
+    Publish = ?PUBLISH_PACKET(?QOS_0, <<"topic">>, undefined, <<"payload">>),
+    {ok, NChannel} = emqx_channel:handle_in(Publish, Channel),
+    ?assertEqual(#{publish_in => 1}, emqx_channel:info(pub_stats, NChannel)).
+
+t_handle_in_qos1_publish(_) ->
+    ok = meck:expect(emqx_broker, publish, fun(_) -> ok end),
+    Channel = channel(#{conn_state => connected}),
+    Publish = ?PUBLISH_PACKET(?QOS_1, <<"topic">>, 1, <<"payload">>),
+    {ok, ?PUBACK_PACKET(1, RC), NChannel} = emqx_channel:handle_in(Publish, Channel),
+    ?assert((RC == ?RC_SUCCESS) orelse (RC == ?RC_NO_MATCHING_SUBSCRIBERS)),
+    ?assertEqual(#{publish_in => 1, puback_out => 1}, emqx_channel:info(pub_stats, NChannel)).
+
+t_handle_in_qos2_publish(_) ->
+    ok = meck:expect(emqx_session, publish, fun(_, _Msg, Session) -> {ok, [], Session} end),
+    ok = meck:expect(emqx_session, info, fun(awaiting_rel_timeout, _Session) -> 300000 end),
+    Channel = channel(#{conn_state => connected}),
+    Publish = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 1, <<"payload">>),
+    {ok, ?PUBREC_PACKET(1, RC), NChannel} = emqx_channel:handle_in(Publish, Channel),
+    ?assert((RC == ?RC_SUCCESS) orelse (RC == ?RC_NO_MATCHING_SUBSCRIBERS)),
+    ?assertEqual(#{publish_in => 1, pubrec_out => 1}, emqx_channel:info(pub_stats, NChannel)).
+
+t_handle_in_puback_ok(_) ->
+    Msg = emqx_message:make(<<"t">>, <<"payload">>),
+    ok = meck:expect(emqx_session, puback,
+                     fun(PacketId, Session) -> {ok, Msg, Session} end),
+    Channel = channel(#{conn_state => connected}),
+    {ok, NChannel} = emqx_channel:handle_in(?PUBACK_PACKET(1, ?RC_SUCCESS), Channel),
+    ?assertEqual(#{puback_in => 1}, emqx_channel:info(pub_stats, NChannel)).
+
+t_handle_in_puback_id_in_use(_) ->
+    ok = meck:expect(emqx_session, puback,
+                     fun(_, _Session) ->
+                             {error, ?RC_PACKET_IDENTIFIER_IN_USE}
+                     end),
+    {ok, Channel} = emqx_channel:handle_in(?PUBACK_PACKET(1, ?RC_SUCCESS), channel()),
+    ?assertEqual(#{puback_in => 1}, emqx_channel:info(pub_stats, Channel)).
+
+t_handle_in_puback_id_not_found(_) ->
+    ok = meck:expect(emqx_session, puback,
+                     fun(_, _Session) ->
+                             {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
+                     end),
+    {ok, Channel} = emqx_channel:handle_in(?PUBACK_PACKET(1, ?RC_SUCCESS), channel()),
+    ?assertEqual(#{puback_in => 1}, emqx_channel:info(pub_stats, Channel)).
+
+t_handle_in_pubrec_ok(_) ->
+    Msg = emqx_message:make(test,?QOS_2, <<"t">>, <<"payload">>),
+    ok = meck:expect(emqx_session, pubrec, fun(_, Session) -> {ok, Msg, Session} end),
+    Channel = channel(#{conn_state => connected}),
+    {ok, ?PUBREL_PACKET(1, ?RC_SUCCESS), Channel1}
+        = emqx_channel:handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), Channel),
+    ?assertEqual(#{pubrec_in => 1, pubrel_out => 1},
+                 emqx_channel:info(pub_stats, Channel1)).
+
+t_handle_in_pubrec_id_in_use(_) ->
+    ok = meck:expect(emqx_session, pubrec,
+                     fun(_, Session) ->
+                             {error, ?RC_PACKET_IDENTIFIER_IN_USE}
+                     end),
+    {ok, ?PUBREL_PACKET(1, ?RC_PACKET_IDENTIFIER_IN_USE), Channel}
+        = emqx_channel:handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), channel()),
+    ?assertEqual(#{pubrec_in => 1, pubrel_out => 1},
+                 emqx_channel:info(pub_stats, Channel)).
+
+t_handle_in_pubrec_id_not_found(_) ->
+    ok = meck:expect(emqx_session, pubrec,
+                     fun(_, Session) ->
+                             {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
+                     end),
+    {ok, ?PUBREL_PACKET(1, ?RC_PACKET_IDENTIFIER_NOT_FOUND), Channel}
+        = emqx_channel:handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), channel()),
+    ?assertEqual(#{pubrec_in => 1, pubrel_out => 1},
+                 emqx_channel:info(pub_stats, Channel)).
+
+t_handle_in_pubrel_ok(_) ->
+    ok = meck:expect(emqx_session, pubrel, fun(_, Session) -> {ok, Session} end),
+    Channel = channel(#{conn_state => connected}),
+    {ok, ?PUBCOMP_PACKET(1, ?RC_SUCCESS), Channel1}
+        = emqx_channel:handle_in(?PUBREL_PACKET(1, ?RC_SUCCESS), Channel),
+    ?assertEqual(#{pubrel_in => 1, pubcomp_out => 1},
+                 emqx_channel:info(pub_stats, Channel1)).
+
+t_handle_in_pubrel_not_found_error(_) ->
+    ok = meck:expect(emqx_session, pubrel,
+                     fun(_PacketId, _Session) ->
+                             {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
+                     end),
+    {ok, ?PUBCOMP_PACKET(1, ?RC_PACKET_IDENTIFIER_NOT_FOUND), _Channel}
+        = emqx_channel:handle_in(?PUBREL_PACKET(1, ?RC_SUCCESS), channel()).
+
+t_handle_in_pubcomp_ok(_) ->
+    ok = meck:expect(emqx_session, pubcomp, fun(_, Session) -> {ok, Session} end),
+    {ok, Channel} = emqx_channel:handle_in(?PUBCOMP_PACKET(1, ?RC_SUCCESS), channel()),
+    ?assertEqual(#{pubcomp_in => 1}, emqx_channel:info(pub_stats, Channel)).
+
+t_handle_in_pubcomp_not_found_error(_) ->
+    ok = meck:expect(emqx_session, pubcomp,
+                     fun(_PacketId, _Session) ->
+                             {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
+                     end),
+    Channel = channel(#{conn_state => connected}),
+    {ok, Channel1} = emqx_channel:handle_in(?PUBCOMP_PACKET(1, ?RC_SUCCESS), Channel),
+    ?assertEqual(#{pubcomp_in => 1}, emqx_channel:info(pub_stats, Channel1)).
+
+t_handle_in_subscribe(_) ->
+    ok = meck:expect(emqx_session, subscribe,
+                     fun(_, _, _, Session) ->
+                             {ok, Session}
+                     end),
+    Channel = channel(#{conn_state => connected}),
+    TopicFilters = [{<<"+">>, ?DEFAULT_SUBOPTS}],
+    Subscribe = ?SUBSCRIBE_PACKET(1, #{}, TopicFilters),
+    {ok, ?SUBACK_PACKET(1, [?QOS_0]), _} = emqx_channel:handle_in(Subscribe, Channel).
+
+t_handle_in_unsubscribe(_) ->
+    ok = meck:expect(emqx_session, unsubscribe,
+                     fun(_, _, Session) ->
+                             {ok, Session}
+                     end),
+    Channel = channel(#{conn_state => connected}),
+    UnsubPkt = ?UNSUBSCRIBE_PACKET(1, #{}, [<<"+">>]),
+    {ok, ?UNSUBACK_PACKET(1), _} = emqx_channel:handle_in(UnsubPkt, Channel).
+
+t_handle_in_pingreq(_) ->
+    {ok, ?PACKET(?PINGRESP), _Channel}
+        = emqx_channel:handle_in(?PACKET(?PINGREQ), channel()).
+
+t_handle_in_disconnect(_) ->
+    Channel = channel(#{conn_state => connected}),
+    {shutdown, normal, Channel1} = emqx_channel:handle_in(?DISCONNECT_PACKET(?RC_SUCCESS), Channel),
+    ?assertEqual(undefined, emqx_channel:info(will_msg, Channel1)).
+
+t_handle_in_auth(_) ->
+    Channel = channel(#{conn_state => connected}),
+    Packet = ?DISCONNECT_PACKET(?RC_IMPLEMENTATION_SPECIFIC_ERROR),
+    {shutdown, implementation_specific_error, Packet, Channel}
+        = emqx_channel:handle_in(?AUTH_PACKET(), Channel).
+
+t_handle_in_frame_error(_) ->
+    IdleChannel = channel(#{conn_state => idle}),
+    {shutdown, frame_too_large, _}
+        = emqx_channel:handle_in({frame_error, frame_too_large}, IdleChannel),
+    ConnectingChan =  channel(#{conn_state => connecting}),
+    {shutdown, frame_too_large, ?CONNACK_PACKET(?RC_MALFORMED_PACKET), _}
+        = emqx_channel:handle_in({frame_error, frame_too_large}, ConnectingChan),
+    ConnectedChan = channel(#{conn_state => connected}),
+    {shutdown, malformed_Packet, ?DISCONNECT_PACKET(?RC_MALFORMED_PACKET), _}
+        = emqx_channel:handle_in({frame_error, frame_too_large}, ConnectedChan),
+    DisconnectedChan = channel(#{conn_state => disconnected}),
+    {ok, DisconnectedChan}
+        = emqx_channel:handle_in({frame_error, frame_too_large}, DisconnectedChan).
+
+%% TODO:
+t_handle_in_expected_packet(_) ->
+    {shutdown, protocol_error, ?DISCONNECT_PACKET(?RC_PROTOCOL_ERROR), _Chan}
+        = emqx_channel:handle_in(packet, channel()).
+
+t_process_connect(_) ->
+    ok = meck:expect(emqx_cm, open_session,
+                     fun(true, _ClientInfo, _ConnInfo) ->
+                             {ok, #{session => session(), present => false}}
+                     end),
+    {ok, [{connack, ?CONNACK_PACKET(?RC_SUCCESS)}], _Channel}
+        = emqx_channel:process_connect(connpkt(), channel(#{conn_state => idle})).
+
+t_handle_publish_qos0(_) ->
+    ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
+    Publish = ?PUBLISH_PACKET(?QOS_0, <<"t">>, 1, <<"payload">>),
+    {ok, _Channel} = emqx_channel:handle_publish(Publish, channel()).
+
+t_process_publish_qos1(_) ->
+    ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
+    Msg = emqx_message:make(test, ?QOS_1, <<"t">>, <<"payload">>),
+    {ok, ?PUBACK_PACKET(1, ?RC_NO_MATCHING_SUBSCRIBERS), _Channel}
+        = emqx_channel:process_publish(1, Msg, channel()).
+
+t_process_subscribe(_) ->
+    ok = meck:expect(emqx_session, subscribe, fun(_, _, _, Session) -> {ok, Session} end),
+    TopicFilters = [{<<"+">>, ?DEFAULT_SUBOPTS}],
+    {[?RC_SUCCESS], _Channel} = emqx_channel:process_subscribe(TopicFilters, channel()).
+
+t_process_unsubscribe(_) ->
+    ok = meck:expect(emqx_session, unsubscribe, fun(_, _, Session) -> {ok, Session} end),
+    TopicFilters = [{<<"+">>, ?DEFAULT_SUBOPTS}],
+    {[?RC_SUCCESS], _Channel} = emqx_channel:process_unsubscribe(TopicFilters, channel()).
 
 %%--------------------------------------------------------------------
 %% Test cases for handle_out
 %%--------------------------------------------------------------------
 
-t_handle_out_connack(_) ->
-    ConnPkt = #mqtt_packet_connect{
-                 proto_name  = <<"MQTT">>,
-                 proto_ver   = ?MQTT_PROTO_V4,
-                 clean_start = true,
-                 properties  = #{},
-                 clientid    = <<"clientid">>
-                },
-    with_channel(
-      fun(Channel) ->
-              {ok, [{enter, connected},{outgoing, ?CONNACK_PACKET(?RC_SUCCESS, SP, _)}], _Chan}
-                = handle_out({connack, ?RC_SUCCESS, 0, ConnPkt}, Channel),
-              {stop, {shutdown, not_authorized}, ?CONNACK_PACKET(?RC_NOT_AUTHORIZED), _}
-                = handle_out({connack, ?RC_NOT_AUTHORIZED, ConnPkt}, Channel)
-      end).
+t_handle_out_delivers(_) ->
+    WithPacketId = fun(Msgs) ->
+                           lists:zip(lists:seq(1, length(Msgs)), Msgs)
+                   end,
+    ok = meck:expect(emqx_session, deliver,
+                     fun(Delivers, Session) ->
+                             Msgs = [Msg || {deliver, _, Msg} <- Delivers],
+                             Publishes = [{publish, PacketId, Msg}
+                                          || {PacketId, Msg} <- WithPacketId(Msgs)],
+                             {ok, Publishes, Session}
+                     end),
+    ok = meck:expect(emqx_session, info, fun(retry_interval, _Session) -> 20000 end),
+    Msg0 = emqx_message:make(test, ?QOS_1, <<"t1">>, <<"qos1">>),
+    Msg1 = emqx_message:make(test, ?QOS_2, <<"t2">>, <<"qos2">>),
+    Delivers = [{deliver, <<"+">>, Msg0}, {deliver, <<"+">>, Msg1}],
+    {ok, {outgoing, Packets}, _Ch} = emqx_channel:handle_out(Delivers, channel()),
+    ?assertEqual([?QOS_1, ?QOS_2], [emqx_packet:qos(Pkt)|| Pkt <- Packets]).
+
+t_handle_out_publishes(_) ->
+    Channel = channel(#{conn_state => connected}),
+    Pub0 = {publish, undefined, emqx_message:make(<<"t">>, <<"qos0">>)},
+    Pub1 = {publish, 1, emqx_message:make(<<"c">>, ?QOS_1, <<"t">>, <<"qos1">>)},
+    {ok, {outgoing, Packets}, NChannel}
+        = emqx_channel:handle_out({publish, [Pub0, Pub1]}, Channel),
+    ?assertEqual(2, length(Packets)),
+    ?assertEqual(#{publish_out => 2}, emqx_channel:info(pub_stats, NChannel)).
 
 t_handle_out_publish(_) ->
-    with_channel(
-      fun(Channel) ->
-              Pub0 = {publish, undefined, emqx_message:make(<<"t">>, <<"qos0">>)},
-              Pub1 = {publish, 1, emqx_message:make(<<"c">>, ?QOS_1, <<"t">>, <<"qos1">>)},
-              {ok, ?PUBLISH_PACKET(?QOS_0), Channel} = handle_out(Pub0, Channel),
-              {ok, ?PUBLISH_PACKET(?QOS_1), Channel} = handle_out(Pub1, Channel),
-              {ok, {outgoing, Packets}, Channel1} = handle_out({publish, [Pub0, Pub1]}, Channel),
-              ?assertEqual(2, length(Packets)),
-              ?assertEqual(#{publish_out => 2}, emqx_channel:info(pub_stats, Channel1))
-      end).
+    Msg = emqx_message:make(<<"clientid">>, ?QOS_1, <<"t">>, <<"payload">>),
+    {ok, ?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>), _Chan}
+        = emqx_channel:handle_out({publish, 1, Msg}, channel()).
+
+t_handle_out_publish_nl(_) ->
+    ClientInfo = clientinfo(#{clientid => <<"clientid">>}),
+    Channel = channel(#{clientinfo => ClientInfo}),
+    Msg = emqx_message:make(<<"clientid">>, ?QOS_1, <<"t1">>, <<"qos1">>),
+    Publish = {publish, 1, emqx_message:set_flag(nl, Msg)},
+    {ok, Channel} = emqx_channel:handle_out(Publish, Channel).
+
+t_handle_out_connack_sucess(_) ->
+    {ok, [{connack, ?CONNACK_PACKET(?RC_SUCCESS, SP, _)}], _Chan}
+        = emqx_channel:handle_out(connack, {?RC_SUCCESS, 0, connpkt()}, channel()).
+
+t_handle_out_connack_failure(_) ->
+    {shutdown, not_authorized, ?CONNACK_PACKET(?RC_NOT_AUTHORIZED), _Chan}
+        = emqx_channel:handle_out(connack, {?RC_NOT_AUTHORIZED, connpkt()}, channel()).
 
 t_handle_out_puback(_) ->
-    with_channel(
-      fun(Channel) ->
-              {ok, Channel} = handle_out({puberr, ?RC_NOT_AUTHORIZED}, Channel),
-              {ok, ?PUBACK_PACKET(1, ?RC_SUCCESS), Channel1}
-                = handle_out({puback, 1, ?RC_SUCCESS}, Channel),
-              ?assertEqual(#{puback_out => 1}, emqx_channel:info(pub_stats, Channel1))
-      end).
+    Channel = channel(#{conn_state => connected}),
+    {ok, ?PUBACK_PACKET(1, ?RC_SUCCESS), NChannel}
+        = emqx_channel:handle_out(puback, {1, ?RC_SUCCESS}, Channel),
+    ?assertEqual(#{puback_out => 1}, emqx_channel:info(pub_stats, NChannel)).
 
 t_handle_out_pubrec(_) ->
-    with_channel(
-      fun(Channel) ->
-              {ok, ?PUBREC_PACKET(4, ?RC_SUCCESS), Channel1}
-                = handle_out({pubrec, 4, ?RC_SUCCESS}, Channel),
-              ?assertEqual(#{pubrec_out => 1}, emqx_channel:info(pub_stats, Channel1))
-      end).
+    Channel = channel(#{conn_state => connected}),
+    {ok, ?PUBREC_PACKET(1, ?RC_SUCCESS), NChannel}
+        = emqx_channel:handle_out(pubrec, {1, ?RC_SUCCESS}, Channel),
+    ?assertEqual(#{pubrec_out => 1}, emqx_channel:info(pub_stats, NChannel)).
 
 t_handle_out_pubrel(_) ->
-    with_channel(
-      fun(Channel) ->
-              {ok, ?PUBREL_PACKET(2), Channel1}
-                = handle_out({pubrel, 2, ?RC_SUCCESS}, Channel),
-              {ok, ?PUBREL_PACKET(3, ?RC_SUCCESS), Channel2}
-                = handle_out({pubrel, 3, ?RC_SUCCESS}, Channel1),
-              ?assertEqual(#{pubrel_out => 2}, emqx_channel:info(pub_stats, Channel2))
-      end).
+    Channel = channel(#{conn_state => connected}),
+    {ok, ?PUBREL_PACKET(1), Channel1}
+        = emqx_channel:handle_out(pubrel, {1, ?RC_SUCCESS}, Channel),
+    {ok, ?PUBREL_PACKET(2, ?RC_SUCCESS), Channel2}
+        = emqx_channel:handle_out(pubrel, {2, ?RC_SUCCESS}, Channel1),
+    ?assertEqual(#{pubrel_out => 2}, emqx_channel:info(pub_stats, Channel2)).
 
 t_handle_out_pubcomp(_) ->
-    with_channel(
-      fun(Channel) ->
-              {ok, ?PUBCOMP_PACKET(5, ?RC_SUCCESS), Channel1}
-                = handle_out({pubcomp, 5, ?RC_SUCCESS}, Channel),
-              ?assertEqual(#{pubcomp_out => 1}, emqx_channel:info(pub_stats, Channel1))
-      end).
+    {ok, ?PUBCOMP_PACKET(1, ?RC_SUCCESS), Channel}
+        = emqx_channel:handle_out(pubcomp, {1, ?RC_SUCCESS}, channel()),
+    ?assertEqual(#{pubcomp_out => 1}, emqx_channel:info(pub_stats, Channel)).
 
 t_handle_out_suback(_) ->
-    with_channel(
-      fun(Channel) ->
-              {ok, ?SUBACK_PACKET(1, [?QOS_2]), Channel}
-                 = handle_out({suback, 1, [?QOS_2]}, Channel)
-      end).
+    {ok, ?SUBACK_PACKET(1, [?QOS_2]), _Channel}
+        = emqx_channel:handle_out(suback, {1, [?QOS_2]}, channel()).
 
 t_handle_out_unsuback(_) ->
-    with_channel(
-      fun(Channel) ->
-              {ok, ?UNSUBACK_PACKET(1), Channel}
-                = handle_out({unsuback, 1, [?RC_SUCCESS]}, Channel)
-      end).
+    {ok, ?UNSUBACK_PACKET(1, [?RC_SUCCESS]), _Channel}
+        = emqx_channel:handle_out(unsuback, {1, [?RC_SUCCESS]}, channel()).
 
 t_handle_out_disconnect(_) ->
-    with_channel(
-      fun(Channel) ->
-              handle_out({disconnect, ?RC_SUCCESS}, Channel)
-      end).
+    {shutdown, normal, ?DISCONNECT_PACKET(?RC_SUCCESS), _Chan}
+        = emqx_channel:handle_out(disconnect, ?RC_SUCCESS, channel()).
+
+t_handle_out_unexpected(_) ->
+    {ok, _Channel} = emqx_channel:handle_out(unexpected, <<"data">>, channel()).
+
+%%--------------------------------------------------------------------
+%% Test cases for handle_call
+%%--------------------------------------------------------------------
+
+t_handle_call_kick(_) ->
+    {shutdown, kicked, ok, _Chan} = emqx_channel:handle_call(kick, channel()).
+
+t_handle_call_discard(_) ->
+    Packet = ?DISCONNECT_PACKET(?RC_SESSION_TAKEN_OVER),
+    {shutdown, discarded, ok, Packet, _Channel}
+        = emqx_channel:handle_call(discard, channel()).
+
+t_handle_call_takeover_begin(_) ->
+    {reply, undefined, _Channel}
+        = emqx_channel:handle_call({takeover, 'begin'}, channel()).
+
+t_handle_call_takeover_end(_) ->
+    ok = meck:expect(emqx_session, takeover, fun(_) -> ok end),
+    {shutdown, takeovered, [], _Channel}
+        = emqx_channel:handle_call({takeover, 'end'}, channel()).
+
+t_handle_call_unexpected(_) ->
+    {reply, ignored, _Channel} = emqx_channel:handle_call(unexpected_req, channel()).
+
+%%--------------------------------------------------------------------
+%% Test cases for handle_info
+%%--------------------------------------------------------------------
+
+t_handle_info_subscribe(_) ->
+    ok = meck:expect(emqx_session, subscribe, fun(_, _, _, Session) -> {ok, Session} end),
+    {ok, _Chan} = emqx_channel:handle_info({subscribe, topic_filters()}, channel()).
+
+t_handle_info_unsubscribe(_) ->
+    ok = meck:expect(emqx_session, unsubscribe, fun(_, _, Session) -> {ok, Session} end),
+    {ok, _Chan} = emqx_channel:handle_info({unsubscribe, topic_filters()}, channel()).
+
+t_handle_info_sock_closed(_) ->
+    {ok, _Chan} = emqx_channel:handle_out({sock_closed, reason},
+                                          channel(#{conn_state => disconnected})).
 
 %%--------------------------------------------------------------------
 %% Test cases for handle_timeout
 %%--------------------------------------------------------------------
 
-t_handle_timeout(_) ->
-    with_channel(
-      fun(Channel) ->
-        'TODO'
-      end).
+t_handle_timeout_emit_stats(_) ->
+    ok = meck:expect(emqx_cm, set_chan_stats, fun(_, _) -> ok end),
+    TRef = make_ref(),
+    Channel = emqx_channel:set_field(timers, #{stats_timer => TRef}, channel()),
+    {ok, _Chan} = emqx_channel:handle_timeout(TRef, {emit_stats, []}, Channel).
+
+t_handle_timeout_keepalive(_) ->
+    TRef = make_ref(),
+    Channel = emqx_channel:set_field(timers, #{alive_timer => TRef}, channel()),
+    {ok, _Chan} = emqx_channel:handle_timeout(make_ref(), {keepalive, 10}, channel()).
+
+t_handle_timeout_retry_delivery(_) ->
+    ok = meck:expect(emqx_session, retry, fun(Session) -> {ok, Session} end),
+    TRef = make_ref(),
+    Channel = emqx_channel:set_field(timers, #{retry_timer => TRef}, channel()),
+    {ok, _Chan} = emqx_channel:handle_timeout(TRef, retry_delivery, channel()).
+
+t_handle_timeout_expire_awaiting_rel(_) ->
+    ok = meck:expect(emqx_session, expire, fun(_, Session) -> {ok, Session} end),
+    TRef = make_ref(),
+    Channel = emqx_channel:set_field(timers, #{await_timer => TRef}, channel()),
+    {ok, _Chan} = emqx_channel:handle_timeout(TRef, expire_awaiting_rel, Channel).
+
+t_handle_timeout_expire_session(_) ->
+    TRef = make_ref(),
+    Channel = emqx_channel:set_field(timers, #{expire_timer => TRef}, channel()),
+    {shutdown, expired, _Chan} = emqx_channel:handle_timeout(TRef, expire_session, Channel).
+
+t_handle_timeout_will_message(_) ->
+    {ok, _Chan} = emqx_channel:handle_timeout(make_ref(), will_message, channel()).
+
+%%--------------------------------------------------------------------
+%% Test cases for internal functions
+%%--------------------------------------------------------------------
+
+t_enrich_conninfo(_) ->
+    {ok, _Chan} = emqx_channel:enrich_conninfo(connpkt(), channel()).
+
+t_enrich_client(_) ->
+    {ok, _ConnPkt, _Chan} = emqx_channel:enrich_client(connpkt(), channel()).
+
+t_check_banned(_) ->
+    ok = emqx_channel:check_banned(connpkt(), channel()).
+
+t_check_flapping(_) ->
+    ok = emqx_channel:check_flapping(connpkt(), channel()).
+
+t_auth_connect(_) ->
+    {ok, _Chan} = emqx_channel:auth_connect(connpkt(), channel()).
+
+t_process_alias(_) ->
+    Publish = #mqtt_packet_publish{topic_name = <<>>, properties = #{'Topic-Alias' => 1}},
+    Channel = emqx_channel:set_field(topic_aliases, #{1 => <<"t">>}, channel()),
+    {ok, #mqtt_packet{variable = #mqtt_packet_publish{topic_name = <<"t">>}}, _Chan}
+        = emqx_channel:process_alias(#mqtt_packet{variable = Publish}, Channel).
+
+t_check_pub_acl(_) ->
+    ok = meck:new(emqx_zone, [passthrough, no_history]),
+    ok = meck:expect(emqx_zone, enable_acl, fun(_) -> true end),
+    Publish = ?PUBLISH_PACKET(?QOS_0, <<"t">>, 1, <<"payload">>),
+    ok = emqx_channel:check_pub_acl(Publish, channel()),
+    ok = meck:unload(emqx_zone).
+
+t_check_pub_alias(_) ->
+    Publish = #mqtt_packet_publish{topic_name = <<>>, properties = #{'Topic-Alias' => 1}},
+    Channel = emqx_channel:set_field(alias_maximum, #{inbound => 10}, channel()),
+    ok = emqx_channel:check_pub_alias(#mqtt_packet{variable = Publish}, Channel).
+
+t_check_subscribe(_) ->
+    ok = meck:new(emqx_zone, [passthrough, no_history]),
+    ok = meck:expect(emqx_zone, enable_acl, fun(_) -> true end),
+    ok = emqx_channel:check_subscribe(<<"t">>, ?DEFAULT_SUBOPTS, channel()),
+    ok = meck:unload(emqx_zone).
+
+t_enrich_caps(_) ->
+    ok = meck:new(emqx_mqtt_caps, [passthrough, no_history]),
+    ok = meck:expect(emqx_mqtt_caps, get_caps,
+                     fun(_Zone) ->
+                        #{max_packet_size => 1024,
+                          max_qos_allowed => ?QOS_2,
+                          retain_available => true,
+                          max_topic_alias => 10,
+                          shared_subscription => true,
+                          wildcard_subscription => true
+                         }
+                     end),
+    AckProps = emqx_channel:enrich_caps(#{}, channel()),
+    ?assertMatch(#{'Retain-Available' := 1,
+                   'Maximum-Packet-Size' := 1024,
+                   'Topic-Alias-Maximum' := 10,
+                   'Wildcard-Subscription-Available' := 1,
+                   'Subscription-Identifier-Available' := 1,
+                   'Shared-Subscription-Available' := 1,
+                   'Maximum-QoS' := ?QOS_2
+                  }, AckProps),
+    ok = meck:unload(emqx_mqtt_caps).
 
 %%--------------------------------------------------------------------
 %% Test cases for terminate
 %%--------------------------------------------------------------------
 
 t_terminate(_) ->
-    with_channel(
-      fun(Channel) ->
-        'TODO'
-      end).
+    ok = emqx_channel:terminate(normal, channel()),
+    ok = emqx_channel:terminate(sock_error, channel(#{conn_state => connected})),
+    ok = emqx_channel:terminate({shutdown, kicked}, channel(#{conn_state => connected})).
 
 %%--------------------------------------------------------------------
 %% Helper functions
 %%--------------------------------------------------------------------
 
-with_connected_channel(TestFun) ->
-    with_channel(
-      fun(Channel) ->
-          TestFun(emqx_channel:set_field(conn_state, connected, Channel))
-      end).
-
-with_channel(TestFun) ->
-    with_channel(#{}, TestFun).
-
-with_channel(ConnInfo, TestFun) ->
-    ConnInfo1 = maps:merge(?DEFAULT_CONNINFO, ConnInfo),
-    ClientInfo = #{zone => <<"external">>,
-                   protocol => mqtt,
-                   peerhost => {127,0,0,1},
-                   clientid => <<"clientid">>,
-                   username => <<"username">>,
-                   peercert => undefined,
-                   is_bridge => false,
-                   is_superuser => false,
-                   mountpoint => undefined
-                  },
-    Channel = emqx_channel:init(ConnInfo1, [{zone, testing}]),
-    Session = emqx_session:init(ClientInfo, ConnInfo1),
-    Channel1 = emqx_channel:set_field(clientinfo, ClientInfo, Channel),
-    TestFun(emqx_channel:set_field(session, Session, Channel1)).
+channel() -> channel(#{}).
+channel(InitFields) ->
+    maps:fold(fun(Field, Value, Channel) ->
+                      emqx_channel:set_field(Field, Value, Channel)
+              end, default_channel(), InitFields).
+
+default_channel() ->
+    Channel = emqx_channel:init(?DEFAULT_CONNINFO, [{zone, zone}]),
+    Channel1 = emqx_channel:set_field(conn_state, connected, Channel),
+    emqx_channel:set_field(clientinfo, clientinfo(), Channel1).
+
+clientinfo() -> clientinfo(#{}).
+clientinfo(InitProps) ->
+    maps:merge(#{zone       => zone,
+                 protocol   => mqtt,
+                 peerhost   => {127,0,0,1},
+                 clientid   => <<"clientid">>,
+                 username   => <<"username">>,
+                 is_superuser => false,
+                 peercert   => undefined,
+                 mountpoint => undefined
+                }, InitProps).
+
+topic_filters() ->
+    [{<<"+">>, ?DEFAULT_SUBOPTS}, {<<"#">>, ?DEFAULT_SUBOPTS}].
+
+connpkt() ->
+    #mqtt_packet_connect{
+       proto_name  = <<"MQTT">>,
+       proto_ver   = ?MQTT_PROTO_V4,
+       is_bridge   = false,
+       clean_start = true,
+       keepalive   = 30,
+       properties  = undefined,
+       clientid    = <<"clientid">>,
+       username    = <<"username">>,
+       password    = <<"passwd">>
+      }.
+
+session() -> session(#{}).
+session(InitFields) when is_map(InitFields) ->
+    maps:fold(fun(Field, Value, Session) ->
+                      emqx_session:set_field(Field, Value, Session)
+              end,
+              emqx_session:init(#{zone => zone}, #{receive_maximum => 0}),
+              InitFields).
 

+ 296 - 26
test/emqx_connection_SUITE.erl

@@ -19,40 +19,310 @@
 -compile(export_all).
 -compile(nowarn_export_all).
 
+-include("emqx_mqtt.hrl").
 -include_lib("eunit/include/eunit.hrl").
 
+-define(STATS_KYES, [recv_pkt, recv_msg, send_pkt, send_msg,
+                     recv_oct, recv_cnt, send_oct, send_cnt,
+                     send_pend
+                    ]).
+
 all() -> emqx_ct:all(?MODULE).
 
+%%--------------------------------------------------------------------
+%% CT callbacks
+%%--------------------------------------------------------------------
+
 init_per_suite(Config) ->
-    emqx_ct_helpers:boot_modules(all),
-    emqx_ct_helpers:start_apps([]),
     Config.
 
 end_per_suite(_Config) ->
-    emqx_ct_helpers:stop_apps([]).
-
-t_basic(_) ->
-    Topic = <<"TopicA">>,
-    {ok, C} = emqtt:start_link([{port, 1883}, {clientid, <<"hello">>}]),
-    {ok, _} = emqtt:connect(C),
-    {ok, _, [1]} = emqtt:subscribe(C, Topic, qos1),
-    {ok, _, [2]} = emqtt:subscribe(C, Topic, qos2),
-    {ok, _} = emqtt:publish(C, Topic, <<"qos 2">>, 2),
-    {ok, _} = emqtt:publish(C, Topic, <<"qos 2">>, 2),
-    {ok, _} = emqtt:publish(C, Topic, <<"qos 2">>, 2),
-    ?assertEqual(3, length(recv_msgs(3))),
-    ok = emqtt:disconnect(C).
-
-recv_msgs(Count) ->
-    recv_msgs(Count, []).
-
-recv_msgs(0, Msgs) ->
-    Msgs;
-recv_msgs(Count, Msgs) ->
+    ok.
+
+init_per_testcase(_TestCase, Config) ->
+    %% Meck Transport
+    ok = meck:new(emqx_transport, [non_strict, passthrough, no_history]),
+    ok = meck:expect(emqx_transport, wait, fun(Sock) -> {ok, Sock} end),
+    ok = meck:expect(emqx_transport, type, fun(_Sock) -> tcp end),
+    ok = meck:expect(emqx_transport, ensure_ok_or_exit,
+                     fun(peername, [sock]) -> {ok, {{127,0,0,1}, 3456}};
+                        (sockname, [sock]) -> {ok, {{127,0,0,1}, 1883}};
+                        (peercert, [sock]) -> undefined
+                     end),
+    ok = meck:expect(emqx_transport, setopts, fun(_Sock, _Opts) -> ok end),
+    ok = meck:expect(emqx_transport, getstat, fun(_Sock, Options) ->
+                                                      {ok, [{K, 0} || K <- Options]}
+                                              end),
+    ok = meck:expect(emqx_transport, async_send, fun(_Sock, _Data) -> ok end),
+    ok = meck:expect(emqx_transport, fast_close, fun(_Sock) -> ok end),
+    %% Meck Channel
+    ok = meck:new(emqx_channel, [passthrough, no_history]),
+    %% Meck Metrics
+    ok = meck:new(emqx_metrics, [passthrough, no_history]),
+    ok = meck:expect(emqx_metrics, inc, fun(_, _) -> ok end),
+    ok = meck:expect(emqx_metrics, inc_recv, fun(_) -> ok end),
+    ok = meck:expect(emqx_metrics, inc_sent, fun(_) -> ok end),
+    Config.
+
+end_per_testcase(_TestCase, Config) ->
+    ok = meck:unload(emqx_transport),
+    ok = meck:unload(emqx_channel),
+    ok = meck:unload(emqx_metrics),
+    Config.
+
+%%--------------------------------------------------------------------
+%% Test cases
+%%--------------------------------------------------------------------
+
+t_start_link_ok(_) ->
+    with_connection(fun(CPid) ->
+                            state = element(1, sys:get_state(CPid))
+                    end).
+
+t_start_link_exit_on_wait(_) ->
+    ok = exit_on_wait_error(enotconn, normal),
+    ok = exit_on_wait_error(einval, normal),
+    ok = exit_on_wait_error(closed, normal),
+    ok = exit_on_wait_error(timeout, {shutdown, ssl_upgrade_timeout}),
+    ok = exit_on_wait_error(enetdown, {shutdown, enetdown}).
+
+t_start_link_exit_on_activate(_) ->
+    ok = exit_on_activate_error(enotconn, normal),
+    ok = exit_on_activate_error(einval, normal),
+    ok = exit_on_activate_error(closed, normal),
+    ok = exit_on_activate_error(econnreset, {shutdown, econnreset}).
+
+t_get_conn_info(_) ->
+    with_connection(fun(CPid) ->
+                            #{sockinfo := SockInfo} = emqx_connection:info(CPid),
+                            ?assertEqual(#{active_n   => 100,
+                                           peername   => {{127,0,0,1},3456},
+                                           pub_limit  => undefined,
+                                           rate_limit => undefined,
+                                           sockname   => {{127,0,0,1},1883},
+                                           sockstate  => running,
+                                           socktype   => tcp}, SockInfo)
+                    end).
+
+t_get_conn_stats(_) ->
+    with_connection(fun(CPid) ->
+                            Stats = emqx_connection:stats(CPid),
+                            lists:foreach(fun(Key) ->
+                                                  0 = proplists:get_value(Key, Stats)
+                                          end, ?STATS_KYES)
+                    end).
+
+t_handle_call_discard(_) ->
+    with_connection(fun(CPid) ->
+                            ok = meck:expect(emqx_channel, handle_call,
+                                             fun(discard, Channel) ->
+                                                     {shutdown, discarded, ok, Channel}
+                                             end),
+                            ok = emqx_connection:call(CPid, discard),
+                            timer:sleep(100),
+                            ok = trap_exit(CPid, {shutdown, discarded})
+                    end, #{trap_exit => true}).
+
+t_handle_call_takeover(_) ->
+    with_connection(fun(CPid) ->
+                            ok = meck:expect(emqx_channel, handle_call,
+                                              fun({takeover, 'begin'}, Channel) ->
+                                                      {reply, session, Channel};
+                                                 ({takeover, 'end'}, Channel) ->
+                                                      {shutdown, takeovered, [], Channel}
+                                              end),
+                            session = emqx_connection:call(CPid, {takeover, 'begin'}),
+                            [] = emqx_connection:call(CPid, {takeover, 'end'}),
+                            timer:sleep(100),
+                            ok = trap_exit(CPid, {shutdown, takeovered})
+                    end, #{trap_exit => true}).
+
+t_handle_call_any(_) ->
+    with_connection(fun(CPid) ->
+                            ok = meck:expect(emqx_channel, handle_call,
+                                             fun(_Req, Channel) -> {reply, ok, Channel} end),
+                            ok = emqx_connection:call(CPid, req)
+                    end).
+
+t_handle_incoming_connect(_) ->
+    with_connection(fun(CPid) ->
+                            ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end),
+                            ConnPkt = #mqtt_packet_connect{proto_ver   = ?MQTT_PROTO_V5,
+                                                           proto_name  = <<"MQTT">>,
+                                                           clientid    = <<>>,
+                                                           clean_start = true,
+                                                           keepalive   = 60
+                                                          },
+                            Frame = make_frame(?CONNECT_PACKET(ConnPkt)),
+                            CPid ! {tcp, sock, Frame}
+                    end).
+
+t_handle_incoming_publish(_) ->
+    with_connection(fun(CPid) ->
+                            ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end),
+                            Frame = make_frame(?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>)),
+                            CPid ! {tcp, sock, Frame}
+                    end).
+
+t_handle_incoming_subscribe(_) ->
+    with_connection(fun(CPid) ->
+                            ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end),
+                            Frame = <<?SUBSCRIBE:4,2:4,11,0,2,0,6,84,111,112,105,99,65,2>>,
+                            CPid ! {tcp, sock, Frame}
+                    end).
+
+t_handle_incoming_unsubscribe(_) ->
+    with_connection(fun(CPid) ->
+                            ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end),
+                            Frame = <<?UNSUBSCRIBE:4,2:4,10,0,2,0,6,84,111,112,105,99,65>>,
+                            CPid ! {tcp, sock, Frame}
+                    end).
+
+t_handle_sock_error(_) ->
+    with_connection(fun(CPid) ->
+                            ok = meck:expect(emqx_channel, handle_info,
+                                             fun({_, Reason}, Channel) ->
+                                                     {shutdown, Reason, Channel}
+                                             end),
+                            %% TODO: fixme later
+                            CPid ! {tcp_error, sock, econnreset},
+                            timer:sleep(100),
+                            trap_exit(CPid, {shutdown, econnreset})
+                    end, #{trap_exit => true}).
+
+t_handle_sock_passive(_) ->
+    with_connection(fun(CPid) -> CPid ! {tcp_passive, sock} end).
+
+t_handle_sock_activate(_) ->
+    with_connection(fun(CPid) -> CPid ! activate_socket end).
+
+t_handle_sock_closed(_) ->
+    with_connection(fun(CPid) ->
+                            ok = meck:expect(emqx_channel, handle_info,
+                                             fun({sock_closed, Reason}, Channel) ->
+                                                     {shutdown, Reason, Channel}
+                                             end),
+                            CPid ! {tcp_closed, sock},
+                            timer:sleep(100),
+                            %%TODO: closed?
+                            trap_exit(CPid, {shutdown, closed})
+                    end, #{trap_exit => true}).
+
+t_handle_outgoing(_) ->
+    with_connection(fun(CPid) ->
+                            Publish = ?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 1, <<>>),
+                            CPid ! {outgoing, Publish},
+                            CPid ! {outgoing, ?PUBREL_PACKET(1)},
+                            CPid ! {outgoing, [?PUBCOMP_PACKET(1)]}
+                    end).
+
+t_conn_rate_limit(_) ->
+    with_connection(fun(CPid) ->
+                            ok = meck:expect(emqx_channel, handle_in, fun(_, Channel) -> {ok, Channel} end),
+                            lists:foreach(fun(I) ->
+                                                  Publish = ?PUBLISH_PACKET(?QOS_0, <<"Topic">>, I, payload(2000)),
+                                                  CPid ! {tcp, sock, make_frame(Publish)}
+                                          end, [1, 2])
+                            %%#{sockinfo := #{sockstate := blocked}} = emqx_connection:info(CPid)
+                    end, #{active_n => 1, rate_limit => {1, 1024}}).
+
+t_conn_pub_limit(_) ->
+    with_connection(fun(CPid) ->
+                            ok = meck:expect(emqx_channel, handle_in, fun(_, Channel) -> {ok, Channel} end),
+                            ok = lists:foreach(fun(I) ->
+                                                       CPid ! {incoming, ?PUBLISH_PACKET(?QOS_0, <<"Topic">>, I, <<>>)}
+                                               end, lists:seq(1, 3))
+                            %%#{sockinfo := #{sockstate := blocked}} = emqx_connection:info(CPid)
+                    end, #{active_n => 1, publish_limit => {1, 2}}).
+
+t_oom_shutdown(_) ->
+    with_connection(fun(CPid) ->
+                            CPid ! {shutdown, message_queue_too_long},
+                            timer:sleep(100),
+                            trap_exit(CPid, {shutdown, message_queue_too_long})
+                    end, #{trap_exit => true}).
+
+t_handle_idle_timeout(_) ->
+    ok = emqx_zone:set_env(external, idle_timeout, 10),
+    with_connection(fun(CPid) ->
+                            timer:sleep(100),
+                            trap_exit(CPid, {shutdown, idle_timeout})
+                    end, #{zone => external, trap_exit => true}).
+
+t_handle_emit_stats(_) ->
+    with_connection(fun(CPid) ->
+                            ok = meck:expect(emqx_channel, handle_timeout,
+                                             fun(_TRef, _TMsg, Channel) ->
+                                                     {ok, Channel}
+                                             end),
+                            CPid ! {timeout, make_ref(), emit_stats}
+                    end).
+
+t_handle_keepalive_timeout(_) ->
+    with_connection(fun(CPid) ->
+                            ok = meck:expect(emqx_channel, handle_timeout,
+                                             fun(_TRef, _TMsg, Channel) ->
+                                                     {shutdown, keepalive_timeout, Channel}
+                                             end),
+                            CPid ! {timeout, make_ref(), keepalive},
+                            timer:sleep(100),
+                            trap_exit(CPid, {shutdown, keepalive_timeout})
+                    end, #{trap_exit => true}).
+
+t_handle_shutdown(_) ->
+    with_connection(fun(CPid) ->
+                            CPid ! Shutdown = {shutdown, reason},
+                            timer:sleep(100),
+                            trap_exit(CPid, Shutdown)
+                    end, #{trap_exit => true}).
+
+%%--------------------------------------------------------------------
+%% Helper functions
+%%--------------------------------------------------------------------
+
+exit_on_wait_error(SockErr, Reason) ->
+    ok = meck:expect(emqx_transport, wait,
+                     fun(_Sock) ->
+                             {error, SockErr}
+                     end),
+    with_connection(fun(CPid) ->
+                            timer:sleep(100),
+                            trap_exit(CPid, Reason)
+                    end, #{trap_exit => true}).
+
+exit_on_activate_error(SockErr, Reason) ->
+    ok = meck:expect(emqx_transport, setopts,
+                     fun(_Sock, _Opts) ->
+                             {error, SockErr}
+                     end),
+    with_connection(fun(CPid) ->
+                            timer:sleep(100),
+                            trap_exit(CPid, Reason)
+                    end, #{trap_exit => true}).
+
+with_connection(TestFun) ->
+    with_connection(TestFun, #{trap_exit => false}).
+
+with_connection(TestFun, Options) when is_map(Options) ->
+    with_connection(TestFun, maps:to_list(Options));
+with_connection(TestFun, Options) ->
+    TrapExit = proplists:get_value(trap_exit, Options, false),
+    process_flag(trap_exit, TrapExit),
+    {ok, CPid} = emqx_connection:start_link(emqx_transport, sock, Options),
+    TestFun(CPid),
+    TrapExit orelse emqx_connection:stop(CPid),
+    ok.
+
+trap_exit(Pid, Reason) ->
     receive
-        {publish, Msg} ->
-            recv_msgs(Count-1, [Msg|Msgs])
-    after 100 ->
-        Msgs
+        {'EXIT', Pid, Reason} -> ok;
+        {'EXIT', Pid, Other}  -> error({unexpect_exit, Other})
+    after
+        0 -> error({expect_exit, Reason})
     end.
 
+make_frame(Packet) ->
+    iolist_to_binary(emqx_frame:serialize(Packet)).
+
+payload(Len) -> iolist_to_binary(lists:duplicate(Len, 1)).
+

+ 226 - 290
test/emqx_session_SUITE.erl

@@ -23,304 +23,240 @@
 -include_lib("proper/include/proper.hrl").
 -include_lib("eunit/include/eunit.hrl").
 
--define(mock_modules,
-        [ emqx_metrics
-        , emqx_broker
-        , emqx_misc
-        , emqx_message
-        , emqx_hooks
-        , emqx_zone
-        ]).
+-import(emqx_session, [set_field/3]).
 
 all() -> emqx_ct:all(?MODULE).
 
-t_proper_session(_) ->
-    Opts = [{numtests, 100}, {to_file, user}],
-    ok = emqx_logger:set_log_level(emergency),
-    ok = before_proper(),
-    ?assert(proper:quickcheck(prop_session(), Opts)),
-    ok = after_proper().
-
-before_proper() ->
-    load(?mock_modules).
-
-after_proper() ->
-    unload(?mock_modules),
-    emqx_logger:set_log_level(error).
-
-prop_session() ->
-    ?FORALL({Session, OpList}, {session(), session_op_list()},
-            begin
-                try
-                    apply_ops(Session, OpList),
-                    true
-                after
-                    true
-                end
-            end).
-
-%%%%%%%%%%%%%%%
-%%% Helpers %%%
-%%%%%%%%%%%%%%%
-
-apply_ops(Session, []) ->
-    ?assertEqual(session, element(1, Session));
-apply_ops(Session, [Op | Rest]) ->
-    NSession = apply_op(Session, Op),
-    apply_ops(NSession, Rest).
-
-apply_op(Session, info) ->
-    Info = emqx_session:info(Session),
-    ?assert(is_map(Info)),
-    ?assert(maps:size(Info) > 0),
-    Session;
-apply_op(Session, attrs) ->
-    Attrs = emqx_session:attrs(Session),
-    ?assert(is_map(Attrs)),
-    ?assert(maps:size(Attrs) > 0),
-    Session;
-apply_op(Session, stats) ->
-    Stats = emqx_session:stats(Session),
-    ?assert(is_list(Stats)),
-    ?assert(length(Stats) > 0),
-    Session;
-apply_op(Session, {info, InfoArg}) ->
-    _Ret = emqx_session:info(InfoArg, Session),
-    Session;
-apply_op(Session, {subscribe, {Client, TopicFilter, SubOpts}}) ->
-    case emqx_session:subscribe(Client, TopicFilter, SubOpts, Session) of
-        {ok, NSession} ->
-            NSession;
-        {error, ?RC_QUOTA_EXCEEDED} ->
-            Session
-    end;
-apply_op(Session, {unsubscribe, {Client, TopicFilter}}) ->
-    case emqx_session:unsubscribe(Client, TopicFilter, Session) of
-        {ok, NSession} ->
-            NSession;
-        {error, ?RC_NO_SUBSCRIPTION_EXISTED} ->
-            Session
-    end;
-apply_op(Session, {publish, {PacketId, Msg}}) ->
-    case emqx_session:publish(PacketId, Msg, Session) of
-        {ok, _Msg} ->
-            Session;
-        {ok, _Deliver, NSession} ->
-            NSession;
-        {error, _ErrorCode} ->
-            Session
-    end;
-apply_op(Session, {puback, PacketId}) ->
-    case emqx_session:puback(PacketId, Session) of
-        {ok, _Msg, NSession} ->
-            NSession;
-        {ok, _Msg, _Publishes, NSession} ->
-            NSession;
-        {error, _ErrorCode} ->
-            Session
-    end;
-apply_op(Session, {pubrec, PacketId}) ->
-    case emqx_session:pubrec(PacketId, Session) of
-        {ok, _Msg, NSession} ->
-            NSession;
-        {error, _ErrorCode} ->
-            Session
-    end;
-apply_op(Session, {pubrel, PacketId}) ->
-    case emqx_session:pubrel(PacketId, Session) of
-        {ok, NSession} ->
-            NSession;
-        {error, _ErrorCode} ->
-            Session
-    end;
-apply_op(Session, {pubcomp, PacketId}) ->
-    case emqx_session:pubcomp(PacketId, Session) of
-        {ok, _Msgs} ->
-            Session;
-        {ok, _Msgs, NSession} ->
-            NSession;
-        {error, _ErrorCode} ->
-            Session
-    end;
-apply_op(Session, {deliver, Delivers}) ->
-    {ok, _Msgs, NSession} = emqx_session:deliver(Delivers, Session),
-    NSession.
-
-%%%%%%%%%%%%%%%%%%
-%%% Generators %%%
-%%%%%%%%%%%%%%%%%%
-session_op_list() ->
-    Union = [info,
-             attrs,
-             stats,
-             {info, info_args()},
-             {subscribe, sub_args()},
-             {unsubscribe, unsub_args()},
-             {publish, publish_args()},
-             {puback, puback_args()},
-             {pubrec, pubrec_args()},
-             {pubrel, pubrel_args()},
-             {pubcomp, pubcomp_args()},
-             {deliver, deliver_args()}
-            ],
-    list(?LAZY(oneof(Union))).
-
-deliver_args() ->
-    list({deliver, topic(), message()}).
-
-info_args() ->
-    oneof([subscriptions,
-           subscriptions_max,
-           upgrade_qos,
-           inflight,
-           inflight_max,
-           retry_interval,
-           mqueue_len,
-           mqueue_max,
-           mqueue_dropped,
-           next_pkt_id,
-           awaiting_rel,
-           awaiting_rel_max,
-           await_rel_timeout,
-           created_at
-          ]).
-
-sub_args() ->
-    ?LET({ClientId, TopicFilter, SubOpts},
-         {clientid(), topic(), sub_opts()},
-         {#{clientid => ClientId}, TopicFilter, SubOpts}).
-
-unsub_args() ->
-    ?LET({ClientId, TopicFilter},
-         {clientid(), topic()},
-         {#{clientid => ClientId}, TopicFilter}).
-
-publish_args() ->
-    ?LET({PacketId, Message},
-         {packetid(), message()},
-         {PacketId, Message}).
-
-puback_args() ->
-    packetid().
-
-pubrec_args() ->
-    packetid().
-
-pubrel_args() ->
-    packetid().
-
-pubcomp_args() ->
-    packetid().
-
-sub_opts() ->
-    ?LET({RH, RAP, NL, QOS, SHARE, SUBID},
-         {rh(), rap(), nl(), qos(), share(), subid()}
-        , make_subopts(RH, RAP, NL, QOS, SHARE, SUBID)).
-
-message() ->
-    ?LET({QoS, Topic, Payload},
-         {qos(), topic(), payload()},
-         emqx_message:make(proper, QoS, Topic, Payload)).
-
-subid() -> integer().
-
-rh() -> oneof([0, 1, 2]).
-
-rap() -> oneof([0, 1]).
-
-nl() -> oneof([0, 1]).
-
-qos() -> oneof([0, 1, 2]).
-
-share() -> binary().
-
-clientid() -> binary().
-
-topic() -> ?LET(No, choose(1, 10),
-                begin
-                    NoBin = integer_to_binary(No),
-                    <<"topic/", NoBin/binary>>
-                end).
-
-payload() -> binary().
-
-packetid() -> choose(1, 30).
-
-zone() ->
-    ?LET(Zone, [{max_subscriptions, max_subscription()},
-                {upgrade_qos, upgrade_qos()},
-                {retry_interval, retry_interval()},
-                {max_awaiting_rel, max_awaiting_rel()},
-                {await_rel_timeout, await_rel_timeout()}]
-        , maps:from_list(Zone)).
-
-max_subscription() ->
-    frequency([{33, 0},
-               {33, 1},
-               {34, choose(0,10)}]).
-
-upgrade_qos() -> bool().
-
-retry_interval() -> ?LET(Interval, choose(0, 20), Interval*1000).
-
-max_awaiting_rel() -> choose(0, 10).
-
-await_rel_timeout() -> ?LET(Interval, choose(0, 150), Interval*1000).
-
-max_inflight() -> choose(0, 10).
-
-option() ->
-    ?LET(Option, [{receive_maximum , max_inflight()}],
-         maps:from_list(Option)).
-
-session() ->
-    ?LET({Zone, Options},
-         {zone(), option()},
-         begin
-             Session = emqx_session:init(#{zone => Zone}, Options),
-             emqx_session:set_field(next_pkt_id, 16#ffff, Session)
-         end).
+%%--------------------------------------------------------------------
+%% CT callbacks
+%%--------------------------------------------------------------------
+
+init_per_testcase(_TestCase, Config) ->
+    %% Meck Broker
+    ok = meck:new(emqx_broker, [passthrough, no_history]),
+    ok = meck:new(emqx_hooks, [passthrough, no_history]),
+    ok = meck:expect(emqx_hooks, run, fun(_Hook, _Args) -> ok end),
+    Config.
+
+end_per_testcase(_TestCase, Config) ->
+    ok = meck:unload(emqx_broker),
+    ok = meck:unload(emqx_hooks),
+    Config.
+
+%%--------------------------------------------------------------------
+%% Test cases for session init
+%%--------------------------------------------------------------------
+
+t_session_init(_) ->
+    Session = emqx_session:init(#{zone => zone}, #{receive_maximum => 64}),
+    ?assertEqual(#{}, emqx_session:info(subscriptions, Session)),
+    ?assertEqual(0, emqx_session:info(subscriptions_cnt, Session)),
+    ?assertEqual(0, emqx_session:info(subscriptions_max, Session)),
+    ?assertEqual(false, emqx_session:info(upgrade_qos, Session)),
+    ?assertEqual(0, emqx_session:info(inflight_cnt, Session)),
+    ?assertEqual(64, emqx_session:info(inflight_max, Session)),
+    ?assertEqual(1, emqx_session:info(next_pkt_id, Session)),
+    ?assertEqual(0, emqx_session:info(retry_interval, Session)),
+    ?assertEqual(0, emqx_session:info(awaiting_rel_cnt, Session)),
+    ?assertEqual(100, emqx_session:info(awaiting_rel_max, Session)),
+    ?assertEqual(3600000, emqx_session:info(awaiting_rel_timeout, Session)),
+    ?assert(is_integer(emqx_session:info(created_at, Session))).
+
+%%--------------------------------------------------------------------
+%% Test cases for session info/stats
+%%--------------------------------------------------------------------
+
+t_session_info(_) ->
+    Info = emqx_session:info(session()),
+    ?assertMatch(#{subscriptions := #{},
+                   subscriptions_max := 0,
+                   upgrade_qos := false,
+                   inflight_max := 0,
+                   retry_interval := 0,
+                   mqueue_len := 0,
+                   mqueue_max := 1000,
+                   mqueue_dropped := 0,
+                   next_pkt_id := 1,
+                   awaiting_rel := #{},
+                   awaiting_rel_max := 100,
+                   awaiting_rel_timeout := 3600000
+                  }, Info).
+
+t_session_attrs(_) ->
+    Attrs = emqx_session:attrs(session()),
+    io:format("~p~n", [Attrs]).
+
+t_session_stats(_) ->
+    Stats = emqx_session:stats(session()),
+    io:format("~p~n", [Stats]).
+
+%%--------------------------------------------------------------------
+%% Test cases for pub/sub
+%%--------------------------------------------------------------------
+
+t_subscribe(_) ->
+    ok = meck:expect(emqx_broker, subscribe, fun(_, _, _) -> ok end),
+    ok = meck:expect(emqx_broker, set_subopts, fun(_, _) -> ok end),
+    {ok, Session} = emqx_session:subscribe(
+                      clientinfo(), <<"#">>, subopts(), session()),
+    ?assertEqual(1, emqx_session:info(subscriptions_cnt, Session)).
+
+t_is_subscriptions_full_false(_) ->
+    Session = session(#{max_subscriptions => 0}),
+    ?assertNot(emqx_session:is_subscriptions_full(Session)).
+
+t_is_subscriptions_full_true(_) ->
+    Session = session(#{max_subscriptions => 1}),
+    ?assertNot(emqx_session:is_subscriptions_full(Session)),
+    Subs = #{<<"t1">> => subopts(), <<"t2">> => subopts()},
+    NSession = set_field(subscriptions, Subs, Session),
+    ?assert(emqx_session:is_subscriptions_full(NSession)).
+
+t_unsubscribe(_) ->
+    ok = meck:expect(emqx_broker, unsubscribe, fun(_) -> ok end),
+    Session = session(#{subscriptions => #{<<"#">> => subopts()}}),
+    {ok, NSession} = emqx_session:unsubscribe(clientinfo(), <<"#">>, Session),
+    Error = emqx_session:unsubscribe(clientinfo(), <<"#">>, NSession),
+    ?assertEqual({error, ?RC_NO_SUBSCRIPTION_EXISTED}, Error).
+
+t_publish_qos2(_) ->
+    ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
+    Msg = emqx_message:make(test, ?QOS_2, <<"t">>, <<"payload">>),
+    {ok, [], Session} = emqx_session:publish(1, Msg, session()),
+    ?assertEqual(1, emqx_session:info(awaiting_rel_cnt, Session)).
+
+t_publish_qos1(_) ->
+    ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
+    Msg = emqx_message:make(test, ?QOS_1, <<"t">>, <<"payload">>),
+    {ok, [], Session} = emqx_session:publish(1, Msg, session()).
+
+t_publish_qos0(_) ->
+    ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
+    Msg = emqx_message:make(test, ?QOS_1, <<"t">>, <<"payload">>),
+    {ok, [], Session} = emqx_session:publish(0, Msg, session()).
+
+t_is_awaiting_full_false(_) ->
+    ?assertNot(emqx_session:is_awaiting_full(session(#{max_awaiting_rel => 0}))).
+
+t_is_awaiting_full_true(_) ->
+    Session = session(#{max_awaiting_rel => 1,
+                        awaiting_rel => #{1 => 1}
+                       }),
+    ?assert(emqx_session:is_awaiting_full(Session)).
+
+t_puback(_) ->
+    Msg = emqx_message:make(test, ?QOS_1, <<"t">>, <<>>),
+    Inflight = emqx_inflight:insert(1, {Msg, os:timestamp()}, emqx_inflight:new()),
+    Session = set_field(inflight, Inflight, session()),
+    {ok, Msg, NSession} = emqx_session:puback(1, Session),
+    ?assertEqual(0, emqx_session:info(inflight_cnt, NSession)).
+
+t_puback_error_packet_id_in_use(_) ->
+    Inflight = emqx_inflight:insert(1, {pubrel, os:timestamp()}, emqx_inflight:new()),
+    Session = set_field(inflight, Inflight, session()),
+    {error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:puback(1, Session).
+
+t_puback_error_packet_id_not_found(_) ->
+    {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:puback(1, session()).
+
+t_pubrec(_) ->
+    Msg = emqx_message:make(test, ?QOS_2, <<"t">>, <<>>),
+    Inflight = emqx_inflight:insert(2, {Msg, os:timestamp()}, emqx_inflight:new()),
+    Session = set_field(inflight, Inflight, session()),
+    {ok, Msg, NSession} = emqx_session:pubrec(2, Session),
+    ?assertMatch([{pubrel, _}], emqx_inflight:values(emqx_session:info(inflight, NSession))).
+
+t_pubrec_packet_id_in_use_error(_) ->
+    Inflight = emqx_inflight:insert(1, {pubrel, ts()}, emqx_inflight:new()),
+    Session = set_field(inflight, Inflight, session()),
+    {error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:puback(1, Session).
+
+t_pubrec_packet_id_not_found_error(_) ->
+    {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubrec(1, session()).
+
+t_pubrel(_) ->
+    Session = set_field(awaiting_rel, #{1 => os:timestamp()}, session()),
+    {ok, NSession} = emqx_session:pubrel(1, Session),
+    ?assertEqual(#{}, emqx_session:info(awaiting_rel, NSession)).
+
+t_pubrel_id_not_found(_) ->
+    {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubrel(1, session()).
+
+t_pubcomp(_) ->
+    Inflight = emqx_inflight:insert(2, {pubrel, os:timestamp()}, emqx_inflight:new()),
+    Session = emqx_session:set_field(inflight, Inflight, session()),
+    {ok, NSession} = emqx_session:pubcomp(2, Session),
+    ?assertEqual(0, emqx_session:info(inflight_cnt, NSession)).
+
+t_pubcomp_id_not_found(_) ->
+    {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubcomp(2, session()).
 
-%%%%%%%%%%%%%%%%%%%%%%%%%%
-%%% Internal functions %%%
-%%%%%%%%%%%%%%%%%%%%%%%%%%
+%%--------------------------------------------------------------------
+%% Test cases for deliver/retry
+%%--------------------------------------------------------------------
+
+t_dequeue(_) ->
+    {ok, Session} = emqx_session:dequeue(session()).
+
+t_deliver(_) ->
+    Delivers = [delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)],
+    {ok, Publishes, _Session} = emqx_session:deliver(Delivers, session()),
+    ?assertEqual(2, length(Publishes)).
+
+t_enqueue(_) ->
+    Delivers = [delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)],
+    Session = emqx_session:enqueue(Delivers, session()),
+    ?assertEqual(2, emqx_session:info(mqueue_len, Session)).
+
+t_retry(_) ->
+    {ok, _Session} = emqx_session:retry(session()).
+
+%%--------------------------------------------------------------------
+%% Test cases for takeover/resume
+%%--------------------------------------------------------------------
+
+t_takeover(_) ->
+    ok = meck:expect(emqx_broker, unsubscribe, fun(_) -> ok end),
+    Session = session(#{subscriptions => #{<<"t">> => ?DEFAULT_SUBOPTS}}),
+    ok = emqx_session:takeover(Session).
+
+t_resume(_) ->
+    ok = meck:expect(emqx_broker, subscribe, fun(_, _, _) -> ok end),
+    Subs = #{<<"t">> => ?DEFAULT_SUBOPTS},
+    Session = session(#{subscriptions => #{<<"t">> => ?DEFAULT_SUBOPTS}}),
+    ok = emqx_session:resume(<<"clientid">>, Session).
+
+t_redeliver(_) ->
+    {ok, [], _Session} = emqx_session:redeliver(session()).
+
+t_expire(_) ->
+    {ok, _Session} = emqx_session:expire(awaiting_rel, session()).
+
+%%--------------------------------------------------------------------
+%% Helper functions
+%%--------------------------------------------------------------------
 
-make_subopts(RH, RAP, NL, QOS, SHARE, SubId) ->
-    #{rh => RH,
-      rap => RAP,
-      nl => NL,
-      qos => QOS,
-      share => SHARE,
-      subid => SubId}.
+session() -> session(#{}).
+session(InitFields) when is_map(InitFields) ->
+    maps:fold(fun(Field, Value, Session) ->
+                      emqx_session:set_field(Field, Value, Session)
+              end,
+              emqx_session:init(#{zone => zone}, #{receive_maximum => 0}),
+              InitFields).
 
 
-load(Modules) ->
-    [mock(Module) || Module <- Modules],
-    ok.
+clientinfo() -> clientinfo(#{}).
+clientinfo(Init) ->
+    maps:merge(#{clientid => <<"clientid">>,
+                 username => <<"username">>
+                }, Init).
 
-unload(Modules) ->
-    lists:foreach(fun(Module) ->
-                          ok = meck:unload(Module)
-                  end, Modules).
+subopts() -> subopts(#{}).
+subopts(Init) ->
+    maps:merge(?DEFAULT_SUBOPTS, Init).
 
-mock(Module) ->
-    ok = meck:new(Module, [passthrough, no_history]),
-    do_mock(Module).
+delivery(QoS, Topic) ->
+    {deliver, Topic, emqx_message:make(test, QoS, Topic, <<"payload">>)}.
 
-do_mock(emqx_metrics) ->
-    meck:expect(emqx_metrics, inc, fun(_Anything) -> ok end);
-do_mock(emqx_broker) ->
-    meck:expect(emqx_broker, subscribe, fun(_, _, _) -> ok end),
-    meck:expect(emqx_broker, set_subopts, fun(_, _) -> ok end),
-    meck:expect(emqx_broker, unsubscribe, fun(_) -> ok end),
-    meck:expect(emqx_broker, publish, fun(_) -> ok end);
-do_mock(emqx_misc) ->
-    meck:expect(emqx_misc, start_timer, fun(_, _) -> tref end);
-do_mock(emqx_message) ->
-    meck:expect(emqx_message, set_header, fun(_Hdr, _Val, Msg) -> Msg end),
-    meck:expect(emqx_message, is_expired, fun(_Msg) -> (rand:uniform(16) > 8) end);
-do_mock(emqx_hooks) ->
-    meck:expect(emqx_hooks, run, fun(_Hook, _Args) -> ok end);
-do_mock(emqx_zone) ->
-    meck:expect(emqx_zone, get_env, fun(Env, Key, Default) -> maps:get(Key, Env, Default) end).
+ts() -> erlang:system_time(second).
 

+ 206 - 29
test/emqx_ws_connection_SUITE.erl

@@ -16,42 +16,219 @@
 
 -module(emqx_ws_connection_SUITE).
 
+-include("emqx_mqtt.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
 -compile(export_all).
 -compile(nowarn_export_all).
 
--include_lib("eunit/include/eunit.hrl").
+-import(emqx_ws_connection,
+        [ websocket_handle/2
+        , websocket_info/2
+        ]).
+
+-define(STATS_KEYS, [recv_oct, recv_cnt, send_oct, send_cnt,
+                     recv_pkt, recv_msg, send_pkt, send_msg
+                    ]).
 
 all() -> emqx_ct:all(?MODULE).
 
+%%--------------------------------------------------------------------
+%% CT callbacks
+%%--------------------------------------------------------------------
+
 init_per_suite(Config) ->
-    emqx_ct_helpers:boot_modules(all),
-    emqx_ct_helpers:start_apps([]),
     Config.
 
 end_per_suite(_Config) ->
-    emqx_ct_helpers:stop_apps([]).
-
-t_basic(_) ->
-    Topic = <<"TopicA">>,
-    {ok, C} = emqtt:start_link([{host, "127.0.0.1"}, {port, 8083}]),
-    {ok, _} = emqtt:ws_connect(C),
-    {ok, _, [1]} = emqtt:subscribe(C, Topic, qos1),
-    {ok, _, [2]} = emqtt:subscribe(C, Topic, qos2),
-    {ok, _} = emqtt:publish(C, Topic, <<"qos 2">>, 2),
-    {ok, _} = emqtt:publish(C, Topic, <<"qos 2">>, 2),
-    {ok, _} = emqtt:publish(C, Topic, <<"qos 2">>, 2),
-    ?assertEqual(3, length(recv_msgs(3))),
-    ok = emqtt:disconnect(C).
-
-recv_msgs(Count) ->
-    recv_msgs(Count, []).
-
-recv_msgs(0, Msgs) ->
-    Msgs;
-recv_msgs(Count, Msgs) ->
-    receive
-        {publish, Msg} ->
-            recv_msgs(Count-1, [Msg|Msgs])
-    after 100 ->
-        Msgs
-    end.
+    ok.
+
+init_per_testcase(_TestCase, Config) ->
+    %% Meck CowboyReq
+    ok = meck:new(cowboy_req, [passthrough, no_history]),
+    ok = meck:expect(cowboy_req, peer, fun(_) -> {{127,0,0,1}, 3456} end),
+    ok = meck:expect(cowboy_req, sock, fun(_) -> {{127,0,0,1}, 8883} end),
+    ok = meck:expect(cowboy_req, cert, fun(_) -> undefined end),
+    ok = meck:expect(cowboy_req, parse_cookies, fun(_) -> undefined end),
+    %% Meck Channel
+    ok = meck:new(emqx_channel, [passthrough, no_history]),
+    ok = meck:expect(emqx_channel, recvd,
+                     fun(_Oct, Channel) ->
+                             {ok, Channel}
+                     end),
+    %% Meck Metrics
+    ok = meck:new(emqx_metrics, [passthrough, no_history]),
+    ok = meck:expect(emqx_metrics, inc, fun(_, _) -> ok end),
+    ok = meck:expect(emqx_metrics, inc_recv, fun(_) -> ok end),
+    ok = meck:expect(emqx_metrics, inc_sent, fun(_) -> ok end),
+    Config.
+
+end_per_testcase(_TestCase, Config) ->
+    ok = meck:unload(cowboy_req),
+    ok = meck:unload(emqx_channel),
+    ok = meck:unload(emqx_metrics),
+    Config.
+
+%%--------------------------------------------------------------------
+%% Test Cases
+%%--------------------------------------------------------------------
+
+%%TODO:...
+t_ws_conn_init(_) ->
+    with_ws_conn(fun(_WsConn) -> ok end).
+
+t_ws_conn_info(_) ->
+    with_ws_conn(fun(WsConn) ->
+                         #{sockinfo := SockInfo} = emqx_ws_connection:info(WsConn),
+                         #{socktype  := ws,
+                           peername  := {{127,0,0,1}, 3456},
+                           sockname  := {{127,0,0,1}, 8883},
+                           sockstate := idle} = SockInfo
+                 end).
+
+t_ws_conn_stats(_) ->
+    with_ws_conn(fun(WsConn) ->
+                         Stats = emqx_ws_connection:stats(WsConn),
+                         lists:foreach(fun(Key) ->
+                                               0 = proplists:get_value(Key, Stats)
+                                       end, ?STATS_KEYS)
+                 end).
+
+t_websocket_init(_) ->
+    with_ws_conn(fun(WsConn) ->
+                         #{sockinfo := SockInfo} = emqx_ws_connection:info(WsConn),
+                         #{socktype  := ws,
+                           peername  := {{127,0,0,1}, 3456},
+                           sockname  := {{127,0,0,1}, 8883},
+                           sockstate := idle
+                          } = SockInfo
+                 end).
+
+t_websocket_handle_binary(_) ->
+    with_ws_conn(fun(WsConn) ->
+                         ok = meck:expect(emqx_channel, recvd, fun(_Oct, Channel) -> {ok, Channel} end),
+                         {ok, WsConn} = websocket_handle({binary, [<<>>]}, WsConn)
+                 end).
+
+t_websocket_handle_ping_pong(_) ->
+    with_ws_conn(fun(WsConn) ->
+                         {ok, WsConn} = websocket_handle(ping, WsConn),
+                         {ok, WsConn} = websocket_handle(pong, WsConn),
+                         {ok, WsConn} = websocket_handle({ping, <<>>}, WsConn),
+                         {ok, WsConn} = websocket_handle({pong, <<>>}, WsConn)
+                 end).
+
+t_websocket_handle_bad_frame(_) ->
+    with_ws_conn(fun(WsConn) ->
+                         {stop, WsConn1} = websocket_handle({badframe, <<>>}, WsConn),
+                         ?assertEqual({shutdown, unexpected_ws_frame}, stop_reason(WsConn1))
+                 end).
+
+t_websocket_info_call(_) ->
+    with_ws_conn(fun(WsConn) ->
+                         From = {make_ref(), self()},
+                         Call = {call, From, badreq},
+                         websocket_info(Call, WsConn)
+                 end).
+
+t_websocket_info_cast(_) ->
+    ok = meck:expect(emqx_channel, handle_info, fun(_Msg, Channel) -> {ok, Channel} end),
+    with_ws_conn(fun(WsConn) -> websocket_info({cast, msg}, WsConn) end).
+
+t_websocket_info_incoming(_) ->
+    ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end),
+    with_ws_conn(fun(WsConn) ->
+                         Connect = ?CONNECT_PACKET(
+                                      #mqtt_packet_connect{proto_ver   = ?MQTT_PROTO_V5,
+                                                           proto_name  = <<"MQTT">>,
+                                                           clientid    = <<>>,
+                                                           clean_start = true,
+                                                           keepalive   = 60}),
+                         {ok, WsConn1} = websocket_info({incoming, Connect}, WsConn),
+                         Publish = ?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>),
+                         {ok, _WsConn2} = websocket_info({incoming, Publish}, WsConn1)
+                 end).
+
+t_websocket_info_deliver(_) ->
+    with_ws_conn(fun(WsConn) ->
+                         ok = meck:expect(emqx_channel, handle_out,
+                                          fun(Delivers, Channel) ->
+                                                  Packets = [emqx_message:to_packet(1, Msg) || {deliver, _, Msg} <- Delivers],
+                                                  {ok, {outgoing, Packets}, Channel}
+                                          end),
+                         Deliver = {deliver, <<"#">>, emqx_message:make(<<"topic">>, <<"payload">>)},
+                         {reply, {binary, _Data}, _WsConn1} = websocket_info(Deliver, WsConn)
+                 end).
+
+t_websocket_info_timeout(_) ->
+    with_ws_conn(fun(WsConn) ->
+                         websocket_info({timeout, make_ref(), keepalive}, WsConn),
+                         websocket_info({timeout, make_ref(), emit_stats}, WsConn),
+                         websocket_info({timeout, make_ref(), retry_delivery}, WsConn)
+                 end).
+
+t_websocket_info_close(_) ->
+    with_ws_conn(fun(WsConn) ->
+                         {stop, WsConn1} = websocket_info({close, sock_error}, WsConn),
+                         ?assertEqual({shutdown, sock_error}, stop_reason(WsConn1))
+                 end).
+
+t_websocket_info_shutdown(_) ->
+    with_ws_conn(fun(WsConn) ->
+                         {stop, WsConn1} = websocket_info({shutdown, reason}, WsConn),
+                         ?assertEqual({shutdown, reason}, stop_reason(WsConn1))
+                 end).
+
+
+t_websocket_info_stop(_) ->
+    with_ws_conn(fun(WsConn) ->
+                         {stop, WsConn1} = websocket_info({stop, normal}, WsConn),
+                         ?assertEqual(normal, stop_reason(WsConn1))
+                 end).
+
+t_websocket_close(_) ->
+    ok = meck:expect(emqx_channel, handle_info,
+                     fun({sock_closed, badframe}, Channel) ->
+                             {shutdown, sock_closed, Channel}
+                     end),
+    with_ws_conn(fun(WsConn) ->
+                         {stop, WsConn1} = emqx_ws_connection:websocket_close(badframe, WsConn),
+                         ?assertEqual(sock_closed, stop_reason(WsConn1))
+                 end).
+
+t_handle_call(_) ->
+    with_ws_conn(fun(WsConn) -> ok end).
+
+t_handle_info(_) ->
+    with_ws_conn(fun(WsConn) -> ok end).
+
+t_handle_timeout(_) ->
+    with_ws_conn(fun(WsConn) -> ok end).
+
+t_parse_incoming(_) ->
+    with_ws_conn(fun(WsConn) -> ok end).
+
+t_handle_incoming(_) ->
+    with_ws_conn(fun(WsConn) -> ok end).
+
+t_handle_return(_) ->
+    with_ws_conn(fun(WsConn) -> ok end).
+
+t_handle_outgoing(_) ->
+    with_ws_conn(fun(WsConn) -> ok end).
+
+%%--------------------------------------------------------------------
+%% Helper functions
+%%--------------------------------------------------------------------
+
+with_ws_conn(TestFun) ->
+    with_ws_conn(TestFun, []).
+
+with_ws_conn(TestFun, Opts) ->
+    {ok, WsConn} = emqx_ws_connection:websocket_init(
+                     [req, emqx_misc:merge_opts([{zone, external}], Opts)]),
+    TestFun(WsConn).
+
+stop_reason(WsConn) ->
+    emqx_ws_connection:info(stop_reason, WsConn).
+