Преглед изворни кода

Improve the process of handling MQTT control packets (#3079)

Feng Lee пре 6 година
родитељ
комит
9c3273a2c6

+ 2 - 2
etc/emqx.conf

@@ -520,7 +520,7 @@ mqtt.max_qos_allowed = 2
 ## Maximum Topic Alias, 0 means no topic alias supported.
 ##
 ## Value: 0-65535
-mqtt.max_topic_alias = 0
+mqtt.max_topic_alias = 65535
 
 ## Whether the Server supports MQTT retained messages.
 ##
@@ -633,7 +633,7 @@ zone.external.force_gc_policy = 1000|1MB
 ## Maximum Topic Alias, 0 means no limit.
 ##
 ## Value: 0-65535
-## zone.external.max_topic_alias = 0
+## zone.external.max_topic_alias = 65535
 
 ## Whether the Server supports retained messages.
 ##

+ 6 - 2
include/emqx_mqtt.hrl

@@ -178,8 +178,9 @@
 %% Maximum MQTT Packet ID and Length
 %%--------------------------------------------------------------------
 
--define(MAX_PACKET_ID, 16#ffff).
--define(MAX_PACKET_SIZE, 16#fffffff).
+-define(MAX_PACKET_ID, 16#FFFF).
+-define(MAX_PACKET_SIZE, 16#FFFFFFF).
+-define(MAX_TOPIC_AlIAS, 16#FFFF).
 
 %%--------------------------------------------------------------------
 %% MQTT Frame Mask
@@ -318,6 +319,9 @@
 %% MQTT Packet Match
 %%--------------------------------------------------------------------
 
+-define(CONNECT_PACKET(),
+        #mqtt_packet{header = #mqtt_packet_header{type = ?CONNECT}}).
+
 -define(CONNECT_PACKET(Var),
     #mqtt_packet{header   = #mqtt_packet_header{type = ?CONNECT},
                  variable = Var}).

+ 5 - 5
priv/emqx.schema

@@ -685,10 +685,10 @@ end}.
   {validators, ["range:0-2"]}
 ]}.
 
-%% @doc Set the Maximum topic alias.
+%% @doc Set the Maximum Topic Alias.
 {mapping, "mqtt.max_topic_alias", "emqx.max_topic_alias", [
- {default, 0},
- {datatype, integer}
+  {default, 65535},
+  {datatype, integer}
 ]}.
 
 %% @doc Whether the server supports MQTT retained messages.
@@ -848,7 +848,7 @@ end}.
 %% @doc Retry interval for redelivering QoS1/2 messages.
 {mapping, "zone.$name.retry_interval", "emqx.zones", [
   {default, "20s"},
-  {datatype, {duration, ms}}
+  {datatype, {duration, s}}
 ]}.
 
 %% @doc Max Packets that Awaiting PUBREL, 0 means no limit
@@ -860,7 +860,7 @@ end}.
 %% @doc Awaiting PUBREL timeout
 {mapping, "zone.$name.await_rel_timeout", "emqx.zones", [
   {default, "300s"},
-  {datatype, {duration, ms}}
+  {datatype, {duration, s}}
 ]}.
 
 %% @doc Ignore loop delivery of messages

+ 17 - 19
src/emqx_access_control.erl

@@ -24,56 +24,54 @@
         , reload_acl/0
         ]).
 
+-type(result() :: #{auth_result := emqx_types:auth_result(),
+                    anonymous := boolean()
+                   }).
+
 %%--------------------------------------------------------------------
 %% APIs
 %%--------------------------------------------------------------------
 
--spec(authenticate(emqx_types:clientinfo())
-      -> {ok, #{auth_result := emqx_types:auth_result(),
-                anonymous := boolean}} | {error, term()}).
-authenticate(Client) ->
-    case emqx_hooks:run_fold('client.authenticate',
-                             [Client], default_auth_result(maps:get(zone, Client, undefined))) of
+-spec(authenticate(emqx_types:clientinfo()) -> {ok, result()} | {error, term()}).
+authenticate(ClientInfo = #{zone := Zone}) ->
+    case emqx_hooks:run_fold('client.authenticate', [ClientInfo], default_auth_result(Zone)) of
     	Result = #{auth_result := success, anonymous := true} ->
             emqx_metrics:inc('auth.mqtt.anonymous'),
 	        {ok, Result};
         Result = #{auth_result := success} ->
 	        {ok, Result};
 	    Result ->
-	        {error, maps:get(auth_result, Result, unknown_error)}
+            {error, maps:get(auth_result, Result, unknown_error)}
     end.
 
 %% @doc Check ACL
--spec(check_acl(emqx_types:cient(), emqx_types:pubsub(), emqx_types:topic())
+-spec(check_acl(emqx_types:clientinfo(), emqx_types:pubsub(), emqx_types:topic())
       -> allow | deny).
-check_acl(Client, PubSub, Topic) ->
+check_acl(ClientInfo, PubSub, Topic) ->
     case emqx_acl_cache:is_enabled() of
-        true ->
-            check_acl_cache(Client, PubSub, Topic);
-        false ->
-            do_check_acl(Client, PubSub, Topic)
+        true  -> check_acl_cache(ClientInfo, PubSub, Topic);
+        false -> do_check_acl(ClientInfo, PubSub, Topic)
     end.
 
-check_acl_cache(Client, PubSub, Topic) ->
+check_acl_cache(ClientInfo, PubSub, Topic) ->
     case emqx_acl_cache:get_acl_cache(PubSub, Topic) of
         not_found ->
-            AclResult = do_check_acl(Client, PubSub, Topic),
+            AclResult = do_check_acl(ClientInfo, PubSub, Topic),
             emqx_acl_cache:put_acl_cache(PubSub, Topic, AclResult),
             AclResult;
         AclResult -> AclResult
     end.
 
-do_check_acl(#{zone := Zone} = Client, PubSub, Topic) ->
+do_check_acl(ClientInfo = #{zone := Zone}, PubSub, Topic) ->
     Default = emqx_zone:get_env(Zone, acl_nomatch, deny),
-    case emqx_hooks:run_fold('client.check_acl', [Client, PubSub, Topic], Default) of
+    case emqx_hooks:run_fold('client.check_acl', [ClientInfo, PubSub, Topic], Default) of
         allow  -> allow;
         _Other -> deny
     end.
 
 -spec(reload_acl() -> ok | {error, term()}).
 reload_acl() ->
-    emqx_acl_cache:is_enabled()
-        andalso emqx_acl_cache:empty_acl_cache(),
+    emqx_acl_cache:is_enabled() andalso emqx_acl_cache:empty_acl_cache(),
     emqx_mod_acl_internal:reload_acl().
 
 default_auth_result(Zone) ->

+ 11 - 5
src/emqx_broker.erl

@@ -124,14 +124,16 @@ subscribe(Topic, SubOpts) when is_binary(Topic), is_map(SubOpts) ->
 
 -spec(subscribe(emqx_topic:topic(), emqx_types:subid(), emqx_types:subopts()) -> ok).
 subscribe(Topic, SubId, SubOpts) when is_binary(Topic), ?is_subid(SubId), is_map(SubOpts) ->
-    SubPid = self(),
-    case ets:member(?SUBOPTION, {SubPid, Topic}) of
-        false ->
+    case ets:member(?SUBOPTION, {SubPid = self(), Topic}) of
+        false -> %% New
             ok = emqx_broker_helper:register_sub(SubPid, SubId),
             do_subscribe(Topic, SubPid, with_subid(SubId, SubOpts));
-        true -> ok
+        true -> %% Existed
+            set_subopts(SubPid, Topic, with_subid(SubId, SubOpts)),
+            ok %% ensure to return 'ok'
     end.
 
+-compile({inline, [with_subid/2]}).
 with_subid(undefined, SubOpts) ->
     SubOpts;
 with_subid(SubId, SubOpts) ->
@@ -377,7 +379,11 @@ get_subopts(SubId, Topic) when ?is_subid(SubId) ->
 
 -spec(set_subopts(emqx_topic:topic(), emqx_types:subopts()) -> boolean()).
 set_subopts(Topic, NewOpts) when is_binary(Topic), is_map(NewOpts) ->
-    Sub = {self(), Topic},
+    set_subopts(self(), Topic, NewOpts).
+
+%% @private
+set_subopts(SubPid, Topic, NewOpts) ->
+    Sub = {SubPid, Topic},
     case ets:lookup(?SUBOPTION, Sub) of
         [{_, OldOpts}] ->
             ets:insert(?SUBOPTION, {Sub, maps:merge(OldOpts, NewOpts)});

Разлика између датотеке није приказан због своје велике величине
+ 478 - 350
src/emqx_channel.erl


+ 70 - 38
src/emqx_cm.erl

@@ -28,12 +28,16 @@
 -export([start_link/0]).
 
 -export([ register_channel/1
+        , register_channel/2
+        , register_channel/3
         , unregister_channel/1
         ]).
 
--export([ get_chan_attrs/1
-        , get_chan_attrs/2
-        , set_chan_attrs/2
+-export([connection_closed/1]).
+
+-export([ get_chan_info/1
+        , get_chan_info/2
+        , set_chan_info/2
         ]).
 
 -export([ get_chan_stats/1
@@ -66,15 +70,13 @@
 
 %% Tables for channel management.
 -define(CHAN_TAB, emqx_channel).
--define(CHAN_P_TAB, emqx_channel_p).
--define(CHAN_ATTRS_TAB, emqx_channel_attrs).
--define(CHAN_STATS_TAB, emqx_channel_stats).
+-define(CHAN_CONN_TAB, emqx_channel_conn).
+-define(CHAN_INFO_TAB, emqx_channel_info).
 
 -define(CHAN_STATS,
         [{?CHAN_TAB, 'channels.count', 'channels.max'},
-         {?CHAN_TAB, 'connections.count', 'connections.max'},
          {?CHAN_TAB, 'sessions.count', 'sessions.max'},
-         {?CHAN_P_TAB, 'sessions.persistent.count', 'sessions.persistent.max'}
+         {?CHAN_CONN_TAB, 'connections.count', 'connections.max'}
         ]).
 
 %% Batch drain
@@ -94,17 +96,28 @@ start_link() ->
 
 %% @doc Register a channel.
 -spec(register_channel(emqx_types:clientid()) -> ok).
-register_channel(ClientId) when is_binary(ClientId) ->
+register_channel(ClientId) ->
     register_channel(ClientId, self()).
 
 %% @doc Register a channel with pid.
 -spec(register_channel(emqx_types:clientid(), chan_pid()) -> ok).
-register_channel(ClientId, ChanPid) ->
+register_channel(ClientId, ChanPid) when is_pid(ChanPid) ->
     Chan = {ClientId, ChanPid},
     true = ets:insert(?CHAN_TAB, Chan),
+    true = ets:insert(?CHAN_CONN_TAB, Chan),
     ok = emqx_cm_registry:register_channel(Chan),
     cast({registered, Chan}).
 
+%% @doc Register a channel with info and stats.
+-spec(register_channel(emqx_types:clientid(),
+                       emqx_types:infos(),
+                       emqx_types:stats()) -> ok).
+register_channel(ClientId, Info, Stats) ->
+    Chan = {ClientId, ChanPid = self()},
+    true = ets:insert(?CHAN_INFO_TAB, {Chan, Info, Stats}),
+    register_channel(ClientId, ChanPid).
+
+%% @doc Unregister a channel.
 -spec(unregister_channel(emqx_types:clientid()) -> ok).
 unregister_channel(ClientId) when is_binary(ClientId) ->
     true = do_unregister_channel({ClientId, self()}),
@@ -113,52 +126,72 @@ unregister_channel(ClientId) when is_binary(ClientId) ->
 %% @private
 do_unregister_channel(Chan) ->
     ok = emqx_cm_registry:unregister_channel(Chan),
-    true = ets:delete_object(?CHAN_P_TAB, Chan),
-    true = ets:delete(?CHAN_ATTRS_TAB, Chan),
-    true = ets:delete(?CHAN_STATS_TAB, Chan),
+    true = ets:delete_object(?CHAN_CONN_TAB, Chan),
+    true = ets:delete(?CHAN_INFO_TAB, Chan),
     ets:delete_object(?CHAN_TAB, Chan).
 
+-spec(connection_closed(emqx_types:clientid()) -> true).
+connection_closed(ClientId) ->
+    connection_closed(ClientId, self()).
+
+-spec(connection_closed(emqx_types:clientid(), chan_pid()) -> true).
+connection_closed(ClientId, ChanPid) ->
+    ets:delete_object(?CHAN_CONN_TAB, {ClientId, ChanPid}).
+
 %% @doc Get info of a channel.
--spec(get_chan_attrs(emqx_types:clientid()) -> maybe(emqx_types:attrs())).
-get_chan_attrs(ClientId) ->
-    with_channel(ClientId, fun(ChanPid) -> get_chan_attrs(ClientId, ChanPid) end).
+-spec(get_chan_info(emqx_types:clientid()) -> maybe(emqx_types:infos())).
+get_chan_info(ClientId) ->
+    with_channel(ClientId, fun(ChanPid) -> get_chan_info(ClientId, ChanPid) end).
 
--spec(get_chan_attrs(emqx_types:clientid(), chan_pid()) -> maybe(emqx_types:attrs())).
-get_chan_attrs(ClientId, ChanPid) when node(ChanPid) == node() ->
+-spec(get_chan_info(emqx_types:clientid(), chan_pid())
+      -> maybe(emqx_types:infos())).
+get_chan_info(ClientId, ChanPid) when node(ChanPid) == node() ->
     Chan = {ClientId, ChanPid},
-    emqx_tables:lookup_value(?CHAN_ATTRS_TAB, Chan);
-get_chan_attrs(ClientId, ChanPid) ->
-    rpc_call(node(ChanPid), get_chan_attrs, [ClientId, ChanPid]).
+    try ets:lookup_element(?CHAN_INFO_TAB, Chan, 2)
+    catch
+        error:badarg -> undefined
+    end;
+get_chan_info(ClientId, ChanPid) ->
+    rpc_call(node(ChanPid), get_chan_info, [ClientId, ChanPid]).
 
-%% @doc Set info of a channel.
--spec(set_chan_attrs(emqx_types:clientid(), emqx_types:attrs()) -> ok).
-set_chan_attrs(ClientId, Info) when is_binary(ClientId) ->
+%% @doc Update infos of the channel.
+-spec(set_chan_info(emqx_types:clientid(), emqx_types:attrs()) -> boolean()).
+set_chan_info(ClientId, Info) when is_binary(ClientId) ->
     Chan = {ClientId, self()},
-    true = ets:insert(?CHAN_ATTRS_TAB, {Chan, Info}),
-    ok.
+    try ets:update_element(?CHAN_INFO_TAB, Chan, {2, Info})
+    catch
+        error:badarg -> false
+    end.
 
 %% @doc Get channel's stats.
 -spec(get_chan_stats(emqx_types:clientid()) -> maybe(emqx_types:stats())).
 get_chan_stats(ClientId) ->
     with_channel(ClientId, fun(ChanPid) -> get_chan_stats(ClientId, ChanPid) end).
 
--spec(get_chan_stats(emqx_types:clientid(), chan_pid()) -> maybe(emqx_types:stats())).
+-spec(get_chan_stats(emqx_types:clientid(), chan_pid())
+      -> maybe(emqx_types:stats())).
 get_chan_stats(ClientId, ChanPid) when node(ChanPid) == node() ->
     Chan = {ClientId, ChanPid},
-    emqx_tables:lookup_value(?CHAN_STATS_TAB, Chan);
+    try ets:lookup_element(?CHAN_INFO_TAB, Chan, 3)
+    catch
+        error:badarg -> undefined
+    end;
 get_chan_stats(ClientId, ChanPid) ->
     rpc_call(node(ChanPid), get_chan_stats, [ClientId, ChanPid]).
 
 %% @doc Set channel's stats.
--spec(set_chan_stats(emqx_types:clientid(), emqx_types:stats()) -> ok).
+-spec(set_chan_stats(emqx_types:clientid(), emqx_types:stats()) -> boolean()).
 set_chan_stats(ClientId, Stats) when is_binary(ClientId) ->
     set_chan_stats(ClientId, self(), Stats).
 
--spec(set_chan_stats(emqx_types:clientid(), chan_pid(), emqx_types:stats()) -> ok).
+-spec(set_chan_stats(emqx_types:clientid(), chan_pid(), emqx_types:stats())
+      -> boolean()).
 set_chan_stats(ClientId, ChanPid, Stats) ->
     Chan = {ClientId, ChanPid},
-    true = ets:insert(?CHAN_STATS_TAB, {Chan, Stats}),
-    ok.
+    try ets:update_element(?CHAN_INFO_TAB, Chan, {3, Stats})
+    catch
+        error:badarg -> false
+    end.
 
 %% @doc Open a session.
 -spec(open_session(boolean(), emqx_types:clientinfo(), emqx_types:conninfo())
@@ -208,7 +241,7 @@ takeover_session(ClientId) ->
     end.
 
 takeover_session(ClientId, ChanPid) when node(ChanPid) == node() ->
-    case get_chan_attrs(ClientId, ChanPid) of
+    case get_chan_info(ClientId, ChanPid) of
         #{conninfo := #{conn_mod := ConnMod}} ->
             Session = ConnMod:call(ChanPid, {takeover, 'begin'}),
             {ok, ConnMod, ChanPid, Session};
@@ -237,7 +270,7 @@ discard_session(ClientId) when is_binary(ClientId) ->
     end.
 
 discard_session(ClientId, ChanPid) when node(ChanPid) == node() ->
-    case get_chan_attrs(ClientId, ChanPid) of
+    case get_chan_info(ClientId, ChanPid) of
         #{conninfo := #{conn_mod := ConnMod}} ->
             ConnMod:call(ChanPid, discard);
         undefined -> ok
@@ -291,10 +324,9 @@ cast(Msg) -> gen_server:cast(?CM, Msg).
 
 init([]) ->
     TabOpts = [public, {write_concurrency, true}],
-    ok = emqx_tables:new(?CHAN_TAB, [bag, {read_concurrency, true} | TabOpts]),
-    ok = emqx_tables:new(?CHAN_P_TAB, [bag | TabOpts]),
-    ok = emqx_tables:new(?CHAN_ATTRS_TAB, [set, compressed | TabOpts]),
-    ok = emqx_tables:new(?CHAN_STATS_TAB, [set | TabOpts]),
+    ok = emqx_tables:new(?CHAN_TAB, [bag, {read_concurrency, true}|TabOpts]),
+    ok = emqx_tables:new(?CHAN_CONN_TAB, [bag | TabOpts]),
+    ok = emqx_tables:new(?CHAN_INFO_TAB, [set, compressed | TabOpts]),
     ok = emqx_stats:update_interval(chan_stats, fun ?MODULE:stats_fun/0),
     {ok, #{chan_pmon => emqx_pmon:new()}}.
 

+ 53 - 63
src/emqx_connection.erl

@@ -92,7 +92,7 @@
 -type(state() :: #state{}).
 
 -define(ACTIVE_N, 100).
--define(INFO_KEYS, [socktype, peername, sockname, sockstate, active_n, limiter]).
+-define(INFO_KEYS, [socktype, peername, sockname, sockstate, active_n]).
 -define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]).
 -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]).
 
@@ -115,8 +115,9 @@ info(CPid) when is_pid(CPid) ->
     call(CPid, info);
 info(State = #state{channel = Channel}) ->
     ChanInfo = emqx_channel:info(Channel),
-    SockInfo = maps:from_list(info(?INFO_KEYS, State)),
-    maps:merge(ChanInfo, #{sockinfo => SockInfo}).
+    SockInfo = maps:from_list(
+                 info(?INFO_KEYS, State)),
+    ChanInfo#{sockinfo => SockInfo}.
 
 info(Keys, State) when is_list(Keys) ->
     [{Key, info(Key, State)} || Key <- Keys];
@@ -149,13 +150,6 @@ stats(#state{transport = Transport,
     ProcStats = emqx_misc:proc_stats(),
     lists:append([SockStats, ConnStats, ChanStats, ProcStats]).
 
-attrs(#state{active_n = ActiveN, sockstate = SockSt, channel = Channel}) ->
-    SockAttrs = #{active_n  => ActiveN,
-                  sockstate => SockSt
-                 },
-    ChanAttrs = emqx_channel:attrs(Channel),
-    maps:merge(ChanAttrs, #{sockinfo => SockAttrs}).
-
 call(Pid, Req) ->
     gen_server:call(Pid, Req, infinity).
 
@@ -326,41 +320,42 @@ handle_msg({incoming, ?PACKET(?PINGREQ)}, State) ->
 handle_msg({incoming, Packet}, State) ->
     handle_incoming(Packet, State);
 
+handle_msg({outgoing, Packets}, State) ->
+    handle_outgoing(Packets, State);
+
 handle_msg({Error, _Sock, Reason}, State)
   when Error == tcp_error; Error == ssl_error ->
     handle_info({sock_error, Reason}, State);
 
 handle_msg({Closed, _Sock}, State)
   when Closed == tcp_closed; Closed == ssl_closed ->
-    handle_info(sock_closed, State);
+    handle_info({sock_closed, Closed}, close_socket(State));
 
 handle_msg({Passive, _Sock}, State)
   when Passive == tcp_passive; Passive == ssl_passive ->
-    InStats = #{cnt => emqx_pd:reset_counter(incoming_pubs),
-                oct => emqx_pd:reset_counter(incoming_bytes)
-               },
+    %% In Stats
+    Pubs = emqx_pd:reset_counter(incoming_pubs),
+    Bytes = emqx_pd:reset_counter(incoming_bytes),
+    InStats = #{cnt => Pubs, oct => Bytes},
     %% Ensure Rate Limit
     NState = ensure_rate_limit(InStats, State),
     %% Run GC and Check OOM
     NState1 = check_oom(run_gc(InStats, NState)),
     handle_info(activate_socket, NState1);
 
-handle_msg(Deliver = {deliver, _Topic, _Msg},
-           State = #state{channel = Channel}) ->
-    Delivers = [Deliver|emqx_misc:drain_deliver()],
-    Ret = emqx_channel:handle_out(Delivers, Channel),
+handle_msg(Deliver = {deliver, _Topic, _Msg}, State =
+           #state{active_n = ActiveN, channel = Channel}) ->
+    Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)],
+    Ret = emqx_channel:handle_deliver(Delivers, Channel),
     handle_chan_return(Ret, State);
 
-handle_msg({outgoing, Packets}, State) ->
-    handle_outgoing(Packets, State);
-
 %% Something sent
 handle_msg({inet_reply, _Sock, ok}, State = #state{active_n = ActiveN}) ->
     case emqx_pd:get_counter(outgoing_pubs) > ActiveN of
         true ->
-            OutStats = #{cnt => emqx_pd:reset_counter(outgoing_pubs),
-                         oct => emqx_pd:reset_counter(outgoing_bytes)
-                        },
+            Pubs = emqx_pd:reset_counter(outgoing_pubs),
+            Bytes = emqx_pd:reset_counter(outgoing_bytes),
+            OutStats = #{cnt => Pubs, oct => Bytes},
             {ok, check_oom(run_gc(OutStats, State))};
         false -> ok
     end;
@@ -368,6 +363,29 @@ handle_msg({inet_reply, _Sock, ok}, State = #state{active_n = ActiveN}) ->
 handle_msg({inet_reply, _Sock, {error, Reason}}, State) ->
     handle_info({sock_error, Reason}, State);
 
+handle_msg({connack, ConnAck}, State) ->
+    handle_outgoing(ConnAck, State);
+
+handle_msg({close, Reason}, State) ->
+    ?LOG(debug, "Force to close the socket due to ~p", [Reason]),
+    handle_info({sock_closed, Reason}, close_socket(State));
+
+handle_msg({event, connected}, State = #state{channel = Channel}) ->
+    ClientId = emqx_channel:info(clientid, Channel),
+    emqx_cm:register_channel(ClientId, info(State), stats(State));
+
+handle_msg({event, disconnected}, State = #state{channel = Channel}) ->
+    ClientId = emqx_channel:info(clientid, Channel),
+    emqx_cm:set_chan_info(ClientId, info(State)),
+    emqx_cm:connection_closed(ClientId),
+    {ok, State};
+
+handle_msg({event, _Other}, State = #state{channel = Channel}) ->
+    ClientId = emqx_channel:info(clientid, Channel),
+    emqx_cm:set_chan_info(ClientId, info(State)),
+    emqx_cm:set_chan_stats(ClientId, stats(State)),
+    {ok, State};
+
 handle_msg({timeout, TRef, TMsg}, State) ->
     handle_timeout(TRef, TMsg, State);
 
@@ -434,15 +452,14 @@ handle_timeout(TRef, limit_timeout, State = #state{limit_timer = TRef}) ->
                         },
     handle_info(activate_socket, NState);
 
-handle_timeout(TRef, emit_stats, State = #state{stats_timer = TRef,
-                                                channel = Channel}) ->
-    #{clientid := ClientId} = emqx_channel:info(clientinfo, Channel),
-    (ClientId =/= undefined) andalso
-        emqx_cm:set_chan_stats(ClientId, stats(State)),
+handle_timeout(TRef, emit_stats, State =
+               #state{stats_timer = TRef, channel = Channel}) ->
+    ClientId = emqx_channel:info(clientid, Channel),
+    emqx_cm:set_chan_stats(ClientId, stats(State)),
     {ok, State#state{stats_timer = undefined}};
 
-handle_timeout(TRef, keepalive, State = #state{transport = Transport,
-                                               socket    = Socket}) ->
+handle_timeout(TRef, keepalive, State =
+               #state{transport = Transport, socket = Socket}) ->
     case Transport:getstat(Socket, [recv_oct]) of
         {ok, [{recv_oct, RecvOct}]} ->
             handle_timeout(TRef, {keepalive, RecvOct}, State);
@@ -479,6 +496,7 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
             {[{frame_error, Reason}|Packets], State}
     end.
 
+-compile({inline, [next_incoming_msgs/1]}).
 next_incoming_msgs([Packet]) ->
     {incoming, Packet};
 next_incoming_msgs(Packets) ->
@@ -552,51 +570,24 @@ send(IoData, #state{transport = Transport, socket = Socket}) ->
 %%--------------------------------------------------------------------
 %% Handle Info
 
-handle_info({connack, ConnAck}, State = #state{channel = Channel}) ->
-    #{clientid := ClientId} = emqx_channel:info(clientinfo, Channel),
-    ok = emqx_cm:register_channel(ClientId),
-    ok = emqx_cm:set_chan_attrs(ClientId, attrs(State)),
-    ok = emqx_cm:set_chan_stats(ClientId, stats(State)),
-    ok = handle_outgoing(ConnAck, State);
-
-handle_info({enter, disconnected}, State = #state{channel = Channel}) ->
-    #{clientid := ClientId} = emqx_channel:info(clientinfo, Channel),
-    emqx_cm:set_chan_attrs(ClientId, attrs(State)),
-    emqx_cm:set_chan_stats(ClientId, stats(State));
-
 handle_info(activate_socket, State = #state{sockstate = OldSst}) ->
     case activate_socket(State) of
         {ok, NState = #state{sockstate = NewSst}} ->
             if OldSst =/= NewSst ->
-                   {ok, {event, sockstate_changed}, NState};
+                   {ok, {event, NewSst}, NState};
                true -> {ok, NState}
             end;
         {error, Reason} ->
             handle_info({sock_error, Reason}, State)
     end;
 
-handle_info({event, sockstate_changed}, State = #state{channel = Channel}) ->
-    #{clientid := ClientId} = emqx_channel:info(clientinfo, Channel),
-    ClientId =/= undefined andalso emqx_cm:set_chan_attrs(ClientId, attrs(State));
-
-%%TODO: this is not right
-handle_info({sock_error, _Reason}, #state{sockstate = closed}) ->
-    ok;
 handle_info({sock_error, Reason}, State) ->
     ?LOG(debug, "Socket error: ~p", [Reason]),
     handle_info({sock_closed, Reason}, close_socket(State));
 
-handle_info(sock_closed, #state{sockstate = closed}) -> ok;
-handle_info(sock_closed, State) ->
-    ?LOG(debug, "Socket closed"),
-    handle_info({sock_closed, closed}, close_socket(State));
-
-handle_info({close, Reason}, State) ->
-    ?LOG(debug, "Force close due to : ~p", [Reason]),
-    handle_info({sock_closed, Reason}, close_socket(State));
-
 handle_info(Info, State = #state{channel = Channel}) ->
-    handle_chan_return(emqx_channel:handle_info(Info, Channel), State).
+    Ret = emqx_channel:handle_info(Info, Channel),
+    handle_chan_return(Ret, State).
 
 %%--------------------------------------------------------------------
 %% Ensure rate limit
@@ -655,8 +646,7 @@ activate_socket(State = #state{transport = Transport,
 %%--------------------------------------------------------------------
 %% Close Socket
 
-close_socket(State = #state{sockstate = closed}) ->
-    State;
+close_socket(State = #state{sockstate = closed}) -> State;
 close_socket(State = #state{transport = Transport, socket = Socket}) ->
     ok = Transport:fast_close(Socket),
     State#state{sockstate = closed}.

+ 2 - 0
src/emqx_metrics.erl

@@ -101,6 +101,7 @@
     {counter, 'packets.pubrel.missed'},         % PUBREL packets missed
     {counter, 'packets.pubcomp.received'},      % PUBCOMP packets received
     {counter, 'packets.pubcomp.sent'},          % PUBCOMP packets sent
+    {counter, 'packets.pubcomp.inuse'},         % PUBCOMP packet_id inuse
     {counter, 'packets.pubcomp.missed'},        % PUBCOMP packets missed
     {counter, 'packets.subscribe.received'},    % SUBSCRIBE Packets received
     {counter, 'packets.subscribe.error'},       % SUBSCRIBE error
@@ -460,5 +461,6 @@ reserved_idx('messages.forward')             -> 51;
 reserved_idx('auth.mqtt.anonymous')          -> 52;
 reserved_idx('channel.gc.cnt')               -> 53;
 reserved_idx('packets.pubrec.inuse')         -> 54;
+reserved_idx('packets.pubcomp.inuse')        -> 55;
 reserved_idx(_)                              -> undefined.
 

+ 11 - 5
src/emqx_misc.erl

@@ -29,6 +29,7 @@
         , start_timer/3
         , cancel_timer/1
         , drain_deliver/0
+        , drain_deliver/1
         , drain_down/1
         , check_oom/1
         , check_oom/2
@@ -112,14 +113,19 @@ cancel_timer(Timer) when is_reference(Timer) ->
     end;
 cancel_timer(_) -> ok.
 
-%% @doc Drain delivers from the channel proc's mailbox.
+%% @doc Drain delivers
 drain_deliver() ->
-    drain_deliver([]).
+    drain_deliver(-1).
 
-drain_deliver(Acc) ->
+drain_deliver(N) when is_integer(N) ->
+    drain_deliver(N, []).
+
+drain_deliver(0, Acc) ->
+    lists:reverse(Acc);
+drain_deliver(N, Acc) ->
     receive
         Deliver = {deliver, _Topic, _Msg} ->
-            drain_deliver([Deliver|Acc])
+            drain_deliver(N-1, [Deliver|Acc])
     after 0 ->
         lists:reverse(Acc)
     end.
@@ -134,7 +140,7 @@ drain_down(0, Acc) ->
 drain_down(Cnt, Acc) ->
     receive
         {'DOWN', _MRef, process, Pid, _Reason} ->
-            drain_down(Cnt - 1, [Pid|Acc])
+            drain_down(Cnt-1, [Pid|Acc])
     after 0 ->
         lists:reverse(Acc)
     end.

+ 1 - 1
src/emqx_mqtt_caps.erl

@@ -59,7 +59,7 @@
 
 -define(DEFAULT_CAPS, #{max_packet_size => ?MAX_PACKET_SIZE,
                         max_clientid_len => ?MAX_CLIENTID_LEN,
-                        max_topic_alias => ?UNLIMITED,
+                        max_topic_alias => ?MAX_TOPIC_AlIAS,
                         max_topic_levels => ?UNLIMITED,
                         max_qos_allowed => ?QOS_2,
                         retain_available => true,

+ 35 - 17
src/emqx_packet.erl

@@ -37,6 +37,7 @@
         ]).
 
 -export([ to_message/2
+        , to_message/3
         , will_msg/1
         ]).
 
@@ -110,7 +111,10 @@ check(#mqtt_packet{variable = SubPkt}) when is_record(SubPkt, mqtt_packet_subscr
 check(#mqtt_packet{variable = UnsubPkt}) when is_record(UnsubPkt, mqtt_packet_unsubscribe) ->
     check(UnsubPkt);
 
-check(#mqtt_packet_publish{topic_name = <<>>, properties = #{'Topic-Alias':= _TopicAlias}}) ->
+%% A Topic Alias of 0 is not permitted.
+check(#mqtt_packet_publish{topic_name = <<>>, properties = #{'Topic-Alias':= 0}}) ->
+    {error, ?RC_PROTOCOL_ERROR};
+check(#mqtt_packet_publish{topic_name = <<>>, properties = #{'Topic-Alias':= _Alias}}) ->
     ok;
 check(#mqtt_packet_publish{topic_name = <<>>, properties = #{}}) ->
     {error, ?RC_PROTOCOL_ERROR};
@@ -174,9 +178,9 @@ check(ConnPkt, Opts) when is_record(ConnPkt, mqtt_packet_connect) ->
 
 check_proto_ver(#mqtt_packet_connect{proto_ver  = Ver,
                                      proto_name = Name}, _Opts) ->
-    case lists:member({Ver, Name}, ?PROTOCOL_NAMES) of
-        true  -> ok;
-        false -> {error, ?RC_UNSUPPORTED_PROTOCOL_VERSION}
+    case proplists:get_value(Ver, ?PROTOCOL_NAMES) of
+        Name   -> ok;
+        _Other -> {error, ?RC_UNSUPPORTED_PROTOCOL_VERSION}
     end.
 
 %% MQTT3.1 does not allow null clientId
@@ -191,7 +195,7 @@ check_client_id(#mqtt_packet_connect{clientid    = <<>>,
                                      clean_start = true}, _Opts) ->
     ok;
 check_client_id(#mqtt_packet_connect{clientid = ClientId},
-                _Opts = #{max_clientid_len := MaxLen}) ->
+                #{max_clientid_len := MaxLen} = _Opts) ->
     case (1 =< (Len = byte_size(ClientId))) andalso (Len =< MaxLen) of
         true  -> ok;
         false -> {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID}
@@ -226,7 +230,7 @@ run_checks([], _Packet, _Options) ->
 run_checks([Check|More], Packet, Options) ->
     case Check(Packet, Options) of
         ok -> run_checks(More, Packet, Options);
-        {error, Reason} -> {error, Reason}
+        Error = {error, _Reason} -> Error
     end.
 
 %% @doc Validate MQTT Packet
@@ -239,20 +243,34 @@ validate_topic_filters(TopicFilters) ->
               emqx_topic:validate(TopicFilter)
       end, TopicFilters).
 
-%% @doc Publish Packet to Message.
 -spec(to_message(emqx_types:clientinfo(), emqx_ypes:packet()) -> emqx_types:message()).
-to_message(#{clientid := ClientId, username := Username, peerhost := PeerHost},
+to_message(ClientInfo, Packet) ->
+    to_message(ClientInfo, #{}, Packet).
+
+%% @doc Transform Publish Packet to Message.
+-spec(to_message(emqx_types:clientinfo(), map(), emqx_ypes:packet())
+      -> emqx_types:message()).
+to_message(#{protocol := Protocol,
+             clientid := ClientId,
+             username := Username,
+             peerhost := PeerHost
+            }, Headers,
            #mqtt_packet{header   = #mqtt_packet_header{type   = ?PUBLISH,
                                                        retain = Retain,
                                                        qos    = QoS,
-                                                       dup    = Dup},
+                                                       dup    = Dup
+                                                      },
                         variable = #mqtt_packet_publish{topic_name = Topic,
-                                                        properties = Props},
-                        payload  = Payload}) ->
+                                                        properties = Props
+                                                       },
+                        payload  = Payload
+                       }) ->
     Msg = emqx_message:make(ClientId, QoS, Topic, Payload),
-    Msg#message{flags = #{dup => Dup, retain => Retain},
-                headers = merge_props(#{username => Username,
-                                        peerhost => PeerHost}, Props)}.
+    Headers1 = merge_props(Headers#{protocol => Protocol,
+                                    username => Username,
+                                    peerhost => PeerHost
+                                   }, Props),
+    Msg#message{flags = #{dup => Dup, retain => Retain}, headers = Headers1}.
 
 -spec(will_msg(#mqtt_packet_connect{}) -> emqx_types:message()).
 will_msg(#mqtt_packet_connect{will_flag = false}) ->
@@ -262,11 +280,11 @@ will_msg(#mqtt_packet_connect{clientid     = ClientId,
                               will_retain  = Retain,
                               will_qos     = QoS,
                               will_topic   = Topic,
-                              will_props   = Properties,
+                              will_props   = Props,
                               will_payload = Payload}) ->
     Msg = emqx_message:make(ClientId, QoS, Topic, Payload),
-    Msg#message{flags = #{dup => false, retain => Retain},
-                headers = merge_props(#{username => Username}, Properties)}.
+    Headers = merge_props(#{username => Username}, Props),
+    Msg#message{flags = #{dup => false, retain => Retain}, headers = Headers}.
 
 merge_props(Headers, undefined) ->
     Headers;

+ 7 - 8
src/emqx_reason_codes.erl

@@ -23,9 +23,10 @@
         , name/2
         , text/1
         , text/2
+        ]).
+
+-export([ frame_error/1
         , connack_error/1
-        , mqtt_frame_error/1
-        , formalized/2
         ]).
 
 -export([compat/2]).
@@ -165,6 +166,9 @@ compat(suback, Code) when Code >= 16#80  -> 16#80;
 compat(unsuback, _Code) -> undefined;
 compat(_Other, _Code) -> undefined.
 
+frame_error(frame_too_large) -> ?RC_PACKET_TOO_LARGE;
+frame_error(_) -> ?RC_MALFORMED_PACKET.
+
 connack_error(client_identifier_not_valid) -> ?RC_CLIENT_IDENTIFIER_NOT_VALID;
 connack_error(bad_username_or_password) -> ?RC_BAD_USER_NAME_OR_PASSWORD;
 connack_error(bad_clientid_or_password) -> ?RC_BAD_USER_NAME_OR_PASSWORD;
@@ -175,11 +179,6 @@ connack_error(server_unavailable) -> ?RC_SERVER_UNAVAILABLE;
 connack_error(server_busy) -> ?RC_SERVER_BUSY;
 connack_error(banned) -> ?RC_BANNED;
 connack_error(bad_authentication_method) -> ?RC_BAD_AUTHENTICATION_METHOD;
+%% TODO: ???
 connack_error(_) -> ?RC_NOT_AUTHORIZED.
 
-mqtt_frame_error(mqtt_frame_too_large) -> ?RC_PACKET_TOO_LARGE;
-mqtt_frame_error(_) -> ?RC_MALFORMED_PACKET.
-
-formalized(connack, Code) when is_integer(Code) -> Code;
-formalized(connack, _Code) ->
-    ?RC_SERVER_UNAVAILABLE.

+ 114 - 139
src/emqx_session.erl

@@ -59,7 +59,6 @@
 
 -export([ info/1
         , info/2
-        , attrs/1
         , stats/1
         ]).
 
@@ -86,7 +85,7 @@
 
 -export([expire/2]).
 
-%% export for ct
+%% Export for ct
 -export([set_field/3]).
 
 -export_type([session/0]).
@@ -109,49 +108,27 @@
           mqueue :: emqx_mqueue:mqueue(),
           %% Next packet id of the session
           next_pkt_id = 1 :: emqx_types:packet_id(),
-          %% Retry interval for redelivering QoS1/2 messages
+          %% Retry interval for redelivering QoS1/2 messages (Unit: second)
           retry_interval :: timeout(),
-          %% Client -> Broker: QoS2 messages received from client and waiting for pubrel.
+          %% Client -> Broker: QoS2 messages received from client and
+          %% waiting for pubrel.
           awaiting_rel :: map(),
           %% Max Packets Awaiting PUBREL
           max_awaiting_rel :: non_neg_integer(),
-          %% Awaiting PUBREL Timeout
-          awaiting_rel_timeout :: timeout(),
-          %% Deliver Stats
-          deliver_stats :: emqx_types:stats(),
+          %% Awaiting PUBREL Timeout (Unit: second)
+          await_rel_timeout :: timeout(),
           %% Created at
           created_at :: pos_integer()
          }).
 
 -opaque(session() :: #session{}).
 
--type(publish() :: {publish, emqx_types:packet_id(), emqx_types:message()}).
-
--define(DEFAULT_BATCH_N, 1000).
-
--define(ATTR_KEYS, [inflight_cnt,
-                    inflight_max,
-                    mqueue_len,
-                    mqueue_max,
-                    retry_interval,
-                    awaiting_rel_max,
-                    awaiting_rel_timeout,
-                    created_at
-                   ]).
+-type(publish() :: {maybe(emqx_types:packet_id()), emqx_types:message()}).
 
 -define(INFO_KEYS, [subscriptions,
-                    subscriptions_max,
                     upgrade_qos,
-                    inflight,
-                    inflight_max,
                     retry_interval,
-                    mqueue_len,
-                    mqueue_max,
-                    mqueue_dropped,
-                    next_pkt_id,
-                    awaiting_rel,
-                    awaiting_rel_max,
-                    awaiting_rel_timeout,
+                    await_rel_timeout,
                     created_at
                    ]).
 
@@ -162,11 +139,13 @@
                      mqueue_len,
                      mqueue_max,
                      mqueue_dropped,
-                     awaiting_rel,
-                     awaiting_rel_max,
-                     enqueue_cnt
+                     next_pkt_id,
+                     awaiting_rel_cnt,
+                     awaiting_rel_max
                     ]).
 
+-define(DEFAULT_BATCH_N, 1000).
+
 %%--------------------------------------------------------------------
 %% Init a Session
 %%--------------------------------------------------------------------
@@ -183,7 +162,7 @@ init(#{zone := Zone}, #{receive_maximum := MaxInflight}) ->
              retry_interval    = get_env(Zone, retry_interval, 0),
              awaiting_rel      = #{},
              max_awaiting_rel  = get_env(Zone, max_awaiting_rel, 100),
-             awaiting_rel_timeout = get_env(Zone, awaiting_rel_timeout, 3600*1000),
+             await_rel_timeout = get_env(Zone, await_rel_timeout, 300),
              created_at        = erlang:system_time(second)
             }.
 
@@ -200,11 +179,6 @@ init_mqueue(Zone) ->
 info(Session) ->
     maps:from_list(info(?INFO_KEYS, Session)).
 
-%% Get attrs of the session.
--spec(attrs(session()) -> emqx_types:attrs()).
-attrs(Session) ->
-    maps:from_list(info(?ATTR_KEYS, Session)).
-
 %% @doc Get stats of the session.
 -spec(stats(session()) -> emqx_types:stats()).
 stats(Session) -> info(?STATS_KEYS, Session).
@@ -239,16 +213,10 @@ info(awaiting_rel, #session{awaiting_rel = AwaitingRel}) ->
     AwaitingRel;
 info(awaiting_rel_cnt, #session{awaiting_rel = AwaitingRel}) ->
     maps:size(AwaitingRel);
-info(awaiting_rel_max, #session{max_awaiting_rel = MaxAwaitingRel}) ->
-    MaxAwaitingRel;
-info(awaiting_rel_timeout, #session{awaiting_rel_timeout = Timeout}) ->
+info(awaiting_rel_max, #session{max_awaiting_rel = Max}) ->
+    Max;
+info(await_rel_timeout, #session{await_rel_timeout = Timeout}) ->
     Timeout;
-info(enqueue_cnt, #session{deliver_stats = undefined}) ->
-    0;
-info(enqueue_cnt, #session{deliver_stats = Stats}) ->
-    maps:get(enqueue_cnt, Stats, 0);
-info(deliver_stats, #session{deliver_stats = Stats}) ->
-    Stats;
 info(created_at, #session{created_at = CreatedAt}) ->
     CreatedAt.
 
@@ -292,35 +260,28 @@ redeliver(Session = #session{inflight = Inflight}) ->
 %% Client -> Broker: SUBSCRIBE
 %%--------------------------------------------------------------------
 
--spec(subscribe(emqx_types:clientinfo(), emqx_types:topic(), emqx_types:subopts(), session())
+-spec(subscribe(emqx_types:clientinfo(), emqx_types:topic(), emqx_types:subopts(),
+                session())
       -> {ok, session()} | {error, emqx_types:reason_code()}).
-subscribe(ClientInfo, TopicFilter, SubOpts, Session = #session{subscriptions = Subs}) ->
-    case is_subscriptions_full(Session)
-        andalso (not maps:is_key(TopicFilter, Subs)) of
-        true  -> {error, ?RC_QUOTA_EXCEEDED};
+subscribe(ClientInfo = #{clientid := ClientId}, TopicFilter, SubOpts,
+          Session = #session{subscriptions = Subs}) ->
+    IsNew = not maps:is_key(TopicFilter, Subs),
+    case IsNew andalso is_subscriptions_full(Session) of
         false ->
-            do_subscribe(ClientInfo, TopicFilter, SubOpts, Session)
+            ok = emqx_broker:subscribe(TopicFilter, ClientId, SubOpts),
+            ok = emqx_hooks:run('session.subscribed',
+                                [ClientInfo, TopicFilter, SubOpts#{is_new => IsNew}]),
+            {ok, Session#session{subscriptions = maps:put(TopicFilter, SubOpts, Subs)}};
+        true -> {error, ?RC_QUOTA_EXCEEDED}
     end.
 
+-compile({inline, [is_subscriptions_full/1]}).
 is_subscriptions_full(#session{max_subscriptions = 0}) ->
     false;
-is_subscriptions_full(#session{max_subscriptions = MaxLimit,
-                               subscriptions = Subs}) ->
+is_subscriptions_full(#session{subscriptions = Subs,
+                               max_subscriptions = MaxLimit}) ->
     maps:size(Subs) >= MaxLimit.
 
--compile({inline, [do_subscribe/4]}).
-do_subscribe(Client = #{clientid := ClientId}, TopicFilter, SubOpts,
-             Session = #session{subscriptions = Subs}) ->
-    case IsNew = (not maps:is_key(TopicFilter, Subs)) of
-        true ->
-            ok = emqx_broker:subscribe(TopicFilter, ClientId, SubOpts);
-        false ->
-            _ = emqx_broker:set_subopts(TopicFilter, SubOpts)
-    end,
-    ok = emqx_hooks:run('session.subscribed',
-                        [Client, TopicFilter, SubOpts#{is_new => IsNew}]),
-    {ok, Session#session{subscriptions = maps:put(TopicFilter, SubOpts, Subs)}}.
-
 %%--------------------------------------------------------------------
 %% Client -> Broker: UNSUBSCRIBE
 %%--------------------------------------------------------------------
@@ -342,39 +303,34 @@ unsubscribe(ClientInfo, TopicFilter, Session = #session{subscriptions = Subs}) -
 %%--------------------------------------------------------------------
 
 -spec(publish(emqx_types:packet_id(), emqx_types:message(), session())
-      -> {ok, emqx_types:publish_result(), session()} |
-         {error, emqx_types:reason_code()}).
-publish(PacketId, Msg = #message{qos = ?QOS_2}, Session) ->
+      -> {ok, emqx_types:publish_result(), session()}
+       | {error, emqx_types:reason_code()}).
+publish(PacketId, Msg = #message{qos = ?QOS_2, timestamp = Ts},
+        Session = #session{awaiting_rel = AwaitingRel}) ->
     case is_awaiting_full(Session) of
         false ->
-            do_publish(PacketId, Msg, Session);
-        true ->
-            {error, ?RC_RECEIVE_MAXIMUM_EXCEEDED}
+            case maps:is_key(PacketId, AwaitingRel) of
+                false ->
+                    Results = emqx_broker:publish(Msg),
+                    AwaitingRel1 = maps:put(PacketId, Ts, AwaitingRel),
+                    {ok, Results, Session#session{awaiting_rel = AwaitingRel1}};
+                true ->
+                    {error, ?RC_PACKET_IDENTIFIER_IN_USE}
+            end;
+        true -> {error, ?RC_RECEIVE_MAXIMUM_EXCEEDED}
     end;
 
 %% Publish QoS0/1 directly
 publish(_PacketId, Msg, Session) ->
     {ok, emqx_broker:publish(Msg), Session}.
 
+-compile({inline, [is_awaiting_full/1]}).
 is_awaiting_full(#session{max_awaiting_rel = 0}) ->
     false;
 is_awaiting_full(#session{awaiting_rel = AwaitingRel,
                           max_awaiting_rel = MaxLimit}) ->
     maps:size(AwaitingRel) >= MaxLimit.
 
--compile({inline, [do_publish/3]}).
-do_publish(PacketId, Msg = #message{timestamp = Ts},
-           Session = #session{awaiting_rel = AwaitingRel}) ->
-    case maps:is_key(PacketId, AwaitingRel) of
-        false ->
-            DeliverResults = emqx_broker:publish(Msg),
-            AwaitingRel1 = maps:put(PacketId, Ts, AwaitingRel),
-            Session1 = Session#session{awaiting_rel = AwaitingRel1},
-            {ok, DeliverResults, Session1};
-        true ->
-            {error, ?RC_PACKET_IDENTIFIER_IN_USE}
-    end.
-
 %%--------------------------------------------------------------------
 %% Client -> Broker: PUBACK
 %%--------------------------------------------------------------------
@@ -394,6 +350,7 @@ puback(PacketId, Session = #session{inflight = Inflight}) ->
             {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
     end.
 
+-compile({inline, [return_with/2]}).
 return_with(Msg, {ok, Session}) ->
     {ok, Msg, Session};
 return_with(Msg, {ok, Publishes, Session}) ->
@@ -404,7 +361,8 @@ return_with(Msg, {ok, Publishes, Session}) ->
 %%--------------------------------------------------------------------
 
 -spec(pubrec(emqx_types:packet_id(), session())
-      -> {ok, emqx_types:message(), session()} | {error, emqx_types:reason_code()}).
+      -> {ok, emqx_types:message(), session()}
+       | {error, emqx_types:reason_code()}).
 pubrec(PacketId, Session = #session{inflight = Inflight}) ->
     case emqx_inflight:lookup(PacketId, Inflight) of
         {value, {Msg, _Ts}} when is_record(Msg, message) ->
@@ -438,11 +396,13 @@ pubrel(PacketId, Session = #session{awaiting_rel = AwaitingRel}) ->
       -> {ok, session()} | {ok, list(publish()), session()}
        | {error, emqx_types:reason_code()}).
 pubcomp(PacketId, Session = #session{inflight = Inflight}) ->
-    case emqx_inflight:contain(PacketId, Inflight) of
-        true ->
+    case emqx_inflight:lookup(PacketId, Inflight) of
+        {value, {pubrel, _Ts}} ->
             Inflight1 = emqx_inflight:delete(PacketId, Inflight),
             dequeue(Session#session{inflight = Inflight1});
-        false ->
+        {value, _Other} ->
+            {error, ?RC_PACKET_IDENTIFIER_IN_USE};
+        none ->
             {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
     end.
 
@@ -481,47 +441,68 @@ batch_n(Inflight) ->
     end.
 
 %%--------------------------------------------------------------------
-%% Broker -> Client: Publish | Msg
+%% Broker -> Client: Deliver
 %%--------------------------------------------------------------------
 
-deliver(Delivers, Session = #session{subscriptions = Subs})
-  when is_list(Delivers) ->
-    Msgs = [enrich_subopt(get_subopts(Topic, Subs), Msg, Session)
-            || {deliver, Topic, Msg} <- Delivers],
+-spec(deliver(list(emqx_types:deliver()), session())
+      -> {ok, session()} | {ok, list(publish()), session()}).
+deliver(Delivers, Session) ->
+    Msgs = lists:map(enrich_fun(Session), Delivers),
     deliver(Msgs, [], Session).
 
 deliver([], Publishes, Session) ->
     {ok, lists:reverse(Publishes), Session};
 
 deliver([Msg = #message{qos = ?QOS_0}|More], Acc, Session) ->
-    deliver(More, [{publish, undefined, Msg}|Acc], Session);
+    Publish = {undefined, maybe_ack(Msg)},
+    deliver(More, [Publish|Acc], Session);
 
 deliver([Msg = #message{qos = QoS}|More], Acc,
         Session = #session{next_pkt_id = PacketId, inflight = Inflight})
     when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 ->
     case emqx_inflight:is_full(Inflight) of
         true ->
-            deliver(More, Acc, enqueue(Msg, Session));
+            Session1 = case maybe_nack(Msg) of
+                           true  -> Session;
+                           false -> enqueue(Msg, Session)
+                       end,
+            deliver(More, Acc, Session1);
         false ->
-            Publish = {publish, PacketId, Msg},
+            Publish = {PacketId, maybe_ack(Msg)},
             Session1 = await(PacketId, Msg, Session),
             deliver(More, [Publish|Acc], next_pkt_id(Session1))
     end.
 
-enqueue(Delivers, Session = #session{subscriptions = Subs}) when is_list(Delivers) ->
-    Msgs = [enrich_subopt(get_subopts(Topic, Subs), Msg, Session)
-            || {deliver, Topic, Msg} <- Delivers],
+-spec(enqueue(list(emqx_types:deliver())|emqx_types:message(), session())
+      -> session()).
+enqueue(Delivers, Session) when is_list(Delivers) ->
+    Msgs = lists:map(enrich_fun(Session), Delivers),
     lists:foldl(fun enqueue/2, Session, Msgs);
 
-enqueue(Msg, Session = #session{mqueue = Q})
-  when is_record(Msg, message) ->
+%%TODO: how to handle the dropped msg?
+enqueue(Msg, Session = #session{mqueue = Q}) when is_record(Msg, message) ->
     {Dropped, NewQ} = emqx_mqueue:in(Msg, Q),
     if is_record(Dropped, message) ->
            ?LOG(warning, "Dropped msg due to mqueue is full: ~s",
                 [emqx_message:format(Dropped)]);
        true -> ok
     end,
-    inc_deliver_stats(enqueue_cnt, Session#session{mqueue = NewQ}).
+    Session#session{mqueue = NewQ}.
+
+enrich_fun(Session = #session{subscriptions = Subs}) ->
+    fun({deliver, Topic, Msg}) ->
+            enrich_subopts(get_subopts(Topic, Subs), Msg, Session)
+    end.
+
+maybe_ack(Msg) ->
+    case emqx_shared_sub:is_ack_required(Msg) of
+        true  -> emqx_shared_sub:maybe_ack(Msg);
+        false -> Msg
+    end.
+
+maybe_nack(Msg) ->
+    emqx_shared_sub:is_ack_required(Msg)
+      andalso (ok == emqx_shared_sub:maybe_nack_dropped(Msg)).
 
 %%--------------------------------------------------------------------
 %% Awaiting ACK for QoS1/QoS2 Messages
@@ -540,26 +521,26 @@ get_subopts(Topic, SubMap) ->
         error -> []
     end.
 
-enrich_subopt([], Msg, _Session) -> Msg;
-enrich_subopt([{nl, 1}|Opts], Msg, Session) ->
-    enrich_subopt(Opts, emqx_message:set_flag(nl, Msg), Session);
-enrich_subopt([{nl, 0}|Opts], Msg, Session) ->
-    enrich_subopt(Opts, Msg, Session);
-enrich_subopt([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS},
-              Session = #session{upgrade_qos= true}) ->
-    enrich_subopt(Opts, Msg#message{qos = max(SubQoS, PubQoS)}, Session);
-enrich_subopt([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS},
-              Session = #session{upgrade_qos= false}) ->
-    enrich_subopt(Opts, Msg#message{qos = min(SubQoS, PubQoS)}, Session);
-enrich_subopt([{rap, _}|Opts], Msg = #message{headers = #{retained := true}}, Session) ->
-    enrich_subopt(Opts, emqx_message:set_flag(retain, true, Msg), Session);
-enrich_subopt([{rap, 0}|Opts], Msg, Session) ->
-    enrich_subopt(Opts, emqx_message:set_flag(retain, false, Msg), Session);
-enrich_subopt([{rap, 1}|Opts], Msg, Session) ->
-    enrich_subopt(Opts, Msg, Session);
-enrich_subopt([{subid, SubId}|Opts], Msg, Session) ->
+enrich_subopts([], Msg, _Session) -> Msg;
+enrich_subopts([{nl, 1}|Opts], Msg, Session) ->
+    enrich_subopts(Opts, emqx_message:set_flag(nl, Msg), Session);
+enrich_subopts([{nl, 0}|Opts], Msg, Session) ->
+    enrich_subopts(Opts, Msg, Session);
+enrich_subopts([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS},
+               Session = #session{upgrade_qos = true}) ->
+    enrich_subopts(Opts, Msg#message{qos = max(SubQoS, PubQoS)}, Session);
+enrich_subopts([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS},
+               Session = #session{upgrade_qos = false}) ->
+    enrich_subopts(Opts, Msg#message{qos = min(SubQoS, PubQoS)}, Session);
+enrich_subopts([{rap, _}|Opts], Msg = #message{headers = #{retained := true}}, Session) ->
+    enrich_subopts(Opts, emqx_message:set_flag(retain, true, Msg), Session);
+enrich_subopts([{rap, 0}|Opts], Msg, Session) ->
+    enrich_subopts(Opts, emqx_message:set_flag(retain, false, Msg), Session);
+enrich_subopts([{rap, 1}|Opts], Msg, Session) ->
+    enrich_subopts(Opts, Msg, Session);
+enrich_subopts([{subid, SubId}|Opts], Msg, Session) ->
     Msg1 = emqx_message:set_header('Subscription-Identifier', SubId, Msg),
-    enrich_subopt(Opts, Msg1, Session).
+    enrich_subopts(Opts, Msg1, Session).
 
 %%--------------------------------------------------------------------
 %% Retry Delivery
@@ -582,14 +563,15 @@ retry_delivery([], _Now, Acc, Session) ->
 retry_delivery([{PacketId, {Val, Ts}}|More], Now, Acc,
                Session = #session{retry_interval = Interval,
                                   inflight = Inflight}) ->
+    IntervalMs = Interval * 1000,
     %% Microseconds -> MilliSeconds
     Age = timer:now_diff(Now, Ts) div 1000,
     if
-        Age >= Interval ->
+        Age >= IntervalMs ->
             {Acc1, Inflight1} = retry_delivery(PacketId, Val, Now, Acc, Inflight),
             retry_delivery(More, Now, Acc1, Session#session{inflight = Inflight1});
         true ->
-            {ok, lists:reverse(Acc), Interval - max(0, Age), Session}
+            {ok, lists:reverse(Acc), IntervalMs - max(0, Age), Session}
     end.
 
 retry_delivery(PacketId, Msg, Now, Acc, Inflight) when is_record(Msg, message) ->
@@ -599,7 +581,7 @@ retry_delivery(PacketId, Msg, Now, Acc, Inflight) when is_record(Msg, message) -
             {Acc, emqx_inflight:delete(PacketId, Inflight)};
         false ->
             Msg1 = emqx_message:set_flag(dup, true, Msg),
-            {[{publish, PacketId, Msg1}|Acc],
+            {[{PacketId, Msg1}|Acc],
              emqx_inflight:update(PacketId, {Msg1, Now}, Inflight)}
     end;
 
@@ -624,11 +606,11 @@ expire_awaiting_rel([], _Now, Session) ->
 
 expire_awaiting_rel([{PacketId, Ts} | More], Now,
                     Session = #session{awaiting_rel = AwaitingRel,
-                                       awaiting_rel_timeout = Timeout}) ->
+                                       await_rel_timeout = Timeout}) ->
     case (timer:now_diff(Now, Ts) div 1000) of
-        Age when Age >= Timeout ->
+        Age when Age >= (Timeout * 1000) ->
             ok = emqx_metrics:inc('messages.qos2.expired'),
-            ?LOG(warning, "Dropped qos2 packet ~s for awaiting_rel_timeout", [PacketId]),
+            ?LOG(warning, "Dropped qos2 packet ~s due to await_rel timeout", [PacketId]),
             Session1 = Session#session{awaiting_rel = maps:remove(PacketId, AwaitingRel)},
             expire_awaiting_rel(More, Now, Session1);
         Age ->
@@ -649,11 +631,4 @@ next_pkt_id(Session = #session{next_pkt_id = Id}) ->
 %% Helper functions
 %%--------------------------------------------------------------------
 
-inc_deliver_stats(Key, Session) ->
-    inc_deliver_stats(Key, 1, Session).
-inc_deliver_stats(Key, I, Session = #session{deliver_stats = undefined}) ->
-    Session#session{deliver_stats = #{Key => I}};
-inc_deliver_stats(Key, I, Session = #session{deliver_stats = Stats}) ->
-    NStats = maps:update_with(Key, fun(V) -> V+I end, I, Stats),
-    Session#session{deliver_stats = NStats}.
 

+ 3 - 1
src/emqx_types.erl

@@ -64,7 +64,8 @@
              , message/0
              ]).
 
--export_type([ delivery/0
+-export_type([ deliver/0
+             , delivery/0
              , publish_result/0
              , deliver_result/0
              ]).
@@ -172,6 +173,7 @@
 -type(payload() :: binary() | iodata()).
 -type(message() :: #message{}).
 -type(banned() :: #banned{}).
+-type(deliver() :: {deliver, topic(), message()}).
 -type(delivery() :: #delivery{}).
 -type(deliver_result() :: ok | {error, term()}).
 -type(publish_result() :: [ {node(), topic(), deliver_result()}

+ 40 - 34
src/emqx_ws_connection.erl

@@ -14,7 +14,7 @@
 %% limitations under the License.
 %%--------------------------------------------------------------------
 
-%% MQTT/WS Connection
+%% MQTT/WS|WSS Connection
 -module(emqx_ws_connection).
 
 -include("emqx.hrl").
@@ -86,7 +86,7 @@
 -type(state() :: #state{}).
 
 -define(ACTIVE_N, 100).
--define(INFO_KEYS, [socktype, peername, sockname, sockstate, active_n, limiter]).
+-define(INFO_KEYS, [socktype, peername, sockname, sockstate, active_n]).
 -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]).
 -define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]).
 
@@ -99,15 +99,15 @@
 -spec(info(pid()|state()) -> emqx_types:infos()).
 info(WsPid) when is_pid(WsPid) ->
     call(WsPid, info);
-info(WsConn = #state{channel = Channel}) ->
+info(State = #state{channel = Channel}) ->
     ChanInfo = emqx_channel:info(Channel),
-    SockInfo = maps:from_list(info(?INFO_KEYS, WsConn)),
-    maps:merge(ChanInfo, #{sockinfo => SockInfo}).
+    SockInfo = maps:from_list(
+                 info(?INFO_KEYS, State)),
+    ChanInfo#{sockinfo => SockInfo}.
 
-info(Keys, WsConn) when is_list(Keys) ->
-    [{Key, info(Key, WsConn)} || Key <- Keys];
-info(socktype, _State) ->
-    ws;
+info(Keys, State) when is_list(Keys) ->
+    [{Key, info(Key, State)} || Key <- Keys];
+info(socktype, _State) -> ws;
 info(peername, #state{peername = Peername}) ->
     Peername;
 info(sockname, #state{sockname = Sockname}) ->
@@ -123,11 +123,6 @@ info(channel, #state{channel = Channel}) ->
 info(stop_reason, #state{stop_reason = Reason}) ->
     Reason.
 
-attrs(State = #state{channel = Channel}) ->
-    ChanAttrs = emqx_channel:attrs(Channel),
-    SockAttrs = maps:from_list(info(?INFO_KEYS, State)),
-    maps:merge(ChanAttrs, #{sockinfo => SockAttrs}).
-
 -spec(stats(pid()|state()) -> emqx_types:stats()).
 stats(WsPid) when is_pid(WsPid) ->
     call(WsPid, stats);
@@ -286,9 +281,10 @@ websocket_info(rate_limit, State) ->
 websocket_info({check_gc, Stats}, State) ->
     {ok, check_oom(run_gc(Stats, State))};
 
-websocket_info({deliver, _Topic, _Msg} = Deliver, State = #state{channel = Channel}) ->
-    Delivers = [Deliver|emqx_misc:drain_deliver()],
-    Ret = emqx_channel:handle_out(Delivers, Channel),
+websocket_info(Deliver = {deliver, _Topic, _Msg},
+               State = #state{active_n = ActiveN, channel = Channel}) ->
+    Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)],
+    Ret = emqx_channel:handle_deliver(Delivers, Channel),
     handle_chan_return(Ret, State);
 
 websocket_info({timeout, TRef, limit_timeout}, State = #state{limit_timer = TRef}) ->
@@ -300,8 +296,8 @@ websocket_info({timeout, TRef, limit_timeout}, State = #state{limit_timer = TRef
 websocket_info({timeout, TRef, Msg}, State) when is_reference(TRef) ->
     handle_timeout(TRef, Msg, State);
 
-websocket_info({close, Reason}, State) ->
-    stop({shutdown, Reason}, State);
+websocket_info(Close = {close, _Reason}, State) ->
+    handle_info(Close, State);
 
 websocket_info({shutdown, Reason}, State) ->
     stop({shutdown, Reason}, State);
@@ -316,7 +312,7 @@ websocket_close(Reason, State) ->
     ?LOG(debug, "WebSocket closed due to ~p~n", [Reason]),
     handle_info({sock_closed, Reason}, State).
 
-terminate(SockError, _Req, #state{channel = Channel,
+terminate(SockError, _Req, #state{channel    = Channel,
                                   stop_reason = Reason}) ->
     ?LOG(debug, "Terminated for ~p, sockerror: ~p", [Reason, SockError]),
     emqx_channel:terminate(Reason, Channel).
@@ -349,16 +345,26 @@ handle_call(From, Req, State = #state{channel = Channel}) ->
 %%--------------------------------------------------------------------
 %% Handle Info
 
-handle_info({connack, ConnAck}, State = #state{channel = Channel}) ->
-    #{clientid := ClientId} = emqx_channel:info(clientinfo, Channel),
-    ok = emqx_cm:register_channel(ClientId),
-    ok = emqx_cm:set_chan_attrs(ClientId, attrs(State)),
-    ok = emqx_cm:set_chan_stats(ClientId, stats(State)),
+handle_info({connack, ConnAck}, State) ->
     reply(enqueue(ConnAck, State));
 
-handle_info({enter, disconnected}, State = #state{channel = Channel}) ->
-    #{clientid := ClientId} = emqx_channel:info(clientinfo, Channel),
-    emqx_cm:set_chan_attrs(ClientId, attrs(State)),
+handle_info({close, Reason}, State) ->
+    stop({shutdown, Reason}, State);
+
+handle_info({event, connected}, State = #state{channel = Channel}) ->
+    ClientId = emqx_channel:info(clientid, Channel),
+    emqx_cm:register_channel(ClientId, info(State), stats(State)),
+    reply(State);
+
+handle_info({event, disconnected}, State = #state{channel = Channel}) ->
+    ClientId = emqx_channel:info(clientid, Channel),
+    emqx_cm:set_chan_info(ClientId, info(State)),
+    emqx_cm:connection_closed(ClientId),
+    reply(State);
+
+handle_info({event, _Other}, State = #state{channel = Channel}) ->
+    ClientId = emqx_channel:info(clientid, Channel),
+    emqx_cm:set_chan_info(ClientId, info(State)),
     emqx_cm:set_chan_stats(ClientId, stats(State)),
     reply(State);
 
@@ -376,10 +382,10 @@ handle_timeout(TRef, keepalive, State) when is_reference(TRef) ->
     RecvOct = emqx_pd:get_counter(recv_oct),
     handle_timeout(TRef, {keepalive, RecvOct}, State);
 
-handle_timeout(TRef, emit_stats, State = #state{channel = Channel,
-                                                stats_timer = TRef}) ->
-    #{clientid := ClientId} = emqx_channel:info(clientinfo, Channel),
-    (ClientId =/= undefined) andalso emqx_cm:set_chan_stats(ClientId, stats(State)),
+handle_timeout(TRef, emit_stats, State =
+               #state{channel = Channel, stats_timer = TRef}) ->
+    ClientId = emqx_channel:info(clientid, Channel),
+    emqx_cm:set_chan_stats(ClientId, stats(State)),
     reply(State#state{stats_timer = undefined});
 
 handle_timeout(TRef, TMsg, State = #state{channel = Channel}) ->
@@ -561,8 +567,8 @@ reply(Packet, State) when is_record(Packet, mqtt_packet) ->
     reply(enqueue(Packet, State));
 reply({outgoing, Packets}, State) ->
     reply(enqueue(Packets, State));
-reply(Close = {close, _Reason}, State) ->
-    self() ! Close,
+reply(Other, State) when is_tuple(Other) ->
+    self() ! Other,
     reply(State);
 
 reply([], State) ->

+ 2 - 2
test/emqx_broker_SUITE.erl

@@ -61,7 +61,7 @@ t_subopts(_) ->
     ?assertEqual(#{qos => 1, subid => <<"clientid">>}, emqx_broker:get_subopts(self(), <<"topic">>)),
     ?assertEqual(#{qos => 1, subid => <<"clientid">>}, emqx_broker:get_subopts(<<"clientid">>,<<"topic">>)),
     emqx_broker:subscribe(<<"topic">>, <<"clientid">>, #{qos => 2}),
-    ?assertEqual(#{qos => 1, subid => <<"clientid">>}, emqx_broker:get_subopts(self(), <<"topic">>)),
+    ?assertEqual(#{qos => 2, subid => <<"clientid">>}, emqx_broker:get_subopts(self(), <<"topic">>)),
     ?assertEqual(true, emqx_broker:set_subopts(<<"topic">>, #{qos => 2})),
     ?assertEqual(#{qos => 2, subid => <<"clientid">>}, emqx_broker:get_subopts(self(), <<"topic">>)),
     emqx_broker:unsubscribe(<<"topic">>).
@@ -210,4 +210,4 @@ recv_msgs(Count, Msgs) ->
         _Other -> recv_msgs(Count, Msgs)
     after 100 ->
         Msgs
-    end.
+    end.

+ 82 - 76
test/emqx_channel_SUITE.erl

@@ -91,11 +91,10 @@ t_chan_info(_) ->
      } = emqx_channel:info(channel()),
     ?assertEqual(clientinfo(), ClientInfo).
 
-t_chan_attrs(_) ->
-    #{conn_state := connected} = emqx_channel:attrs(channel()).
-
 t_chan_caps(_) ->
-    _Caps = emqx_channel:caps(channel()).
+    Caps = emqx_mqtt_caps:default(),
+    ?assertEqual(Caps#{max_packet_size => 1048576},
+                 emqx_channel:caps(channel())).
 
 %%--------------------------------------------------------------------
 %% Test cases for channel init
@@ -114,8 +113,9 @@ t_handle_in_connect_packet_sucess(_) ->
                      fun(true, _ClientInfo, _ConnInfo) ->
                              {ok, #{session => session(), present => false}}
                      end),
-    {ok, [{connack, ?CONNACK_PACKET(?RC_SUCCESS, 0)}], Channel}
-        = emqx_channel:handle_in(?CONNECT_PACKET(connpkt()), channel(#{conn_state => idle})),
+    IdleChannel = channel(#{conn_state => idle}),
+    {ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS, 0, _)}], Channel}
+      = emqx_channel:handle_in(?CONNECT_PACKET(connpkt()), IdleChannel),
     ClientInfo = emqx_channel:info(clientinfo, Channel),
     ?assertMatch(#{clientid := <<"clientid">>,
                    username := <<"username">>
@@ -124,8 +124,9 @@ t_handle_in_connect_packet_sucess(_) ->
 
 t_handle_in_unexpected_connect_packet(_) ->
     Channel = emqx_channel:set_field(conn_state, connected, channel()),
-    {shutdown, protocol_error, ?DISCONNECT_PACKET(?RC_PROTOCOL_ERROR), Channel}
-        = emqx_channel:handle_in(?CONNECT_PACKET(connpkt()), Channel).
+    Packet = ?DISCONNECT_PACKET(?RC_PROTOCOL_ERROR),
+    {ok, [{outgoing, Packet}, {close, protocol_error}], Channel}
+      = emqx_channel:handle_in(?CONNECT_PACKET(connpkt()), Channel).
 
 t_handle_in_qos0_publish(_) ->
     ok = meck:expect(emqx_broker, publish, fun(_) -> ok end),
@@ -144,7 +145,7 @@ t_handle_in_qos1_publish(_) ->
 
 t_handle_in_qos2_publish(_) ->
     ok = meck:expect(emqx_session, publish, fun(_, _Msg, Session) -> {ok, [], Session} end),
-    ok = meck:expect(emqx_session, info, fun(awaiting_rel_timeout, _Session) -> 300000 end),
+    ok = meck:expect(emqx_session, info, fun(await_rel_timeout, _Session) -> 300 end),
     Channel = channel(#{conn_state => connected}),
     Publish = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 1, <<"payload">>),
     {ok, ?PUBREC_PACKET(1, RC), _NChannel} = emqx_channel:handle_in(Publish, Channel),
@@ -236,13 +237,12 @@ t_handle_in_pubcomp_not_found_error(_) ->
 
 t_handle_in_subscribe(_) ->
     ok = meck:expect(emqx_session, subscribe,
-                     fun(_, _, _, Session) ->
-                             {ok, Session}
-                     end),
+                     fun(_, _, _, Session) -> {ok, Session} end),
     Channel = channel(#{conn_state => connected}),
     TopicFilters = [{<<"+">>, ?DEFAULT_SUBOPTS}],
     Subscribe = ?SUBSCRIBE_PACKET(1, #{}, TopicFilters),
-    {ok, ?SUBACK_PACKET(1, [?QOS_0]), _} = emqx_channel:handle_in(Subscribe, Channel).
+    Replies = [{outgoing, ?SUBACK_PACKET(1, [?QOS_0])}, {event, updated}],
+    {ok, Replies, _Chan} = emqx_channel:handle_in(Subscribe, Channel).
 
 t_handle_in_unsubscribe(_) ->
     ok = meck:expect(emqx_session, unsubscribe,
@@ -250,61 +250,65 @@ t_handle_in_unsubscribe(_) ->
                              {ok, Session}
                      end),
     Channel = channel(#{conn_state => connected}),
-    UnsubPkt = ?UNSUBSCRIBE_PACKET(1, #{}, [<<"+">>]),
-    {ok, ?UNSUBACK_PACKET(1), _} = emqx_channel:handle_in(UnsubPkt, Channel).
+    {ok, [{outgoing, ?UNSUBACK_PACKET(1)}, {event, updated}], _Chan}
+      = emqx_channel:handle_in(?UNSUBSCRIBE_PACKET(1, #{}, [<<"+">>]), Channel).
 
 t_handle_in_pingreq(_) ->
     {ok, ?PACKET(?PINGRESP), _Channel}
         = emqx_channel:handle_in(?PACKET(?PINGREQ), channel()).
 
 t_handle_in_disconnect(_) ->
+    Packet = ?DISCONNECT_PACKET(?RC_SUCCESS),
     Channel = channel(#{conn_state => connected}),
-    {shutdown, normal, Channel1} = emqx_channel:handle_in(?DISCONNECT_PACKET(?RC_SUCCESS), Channel),
+    {ok, {close, normal}, Channel1} = emqx_channel:handle_in(Packet, Channel),
     ?assertEqual(undefined, emqx_channel:info(will_msg, Channel1)).
 
 t_handle_in_auth(_) ->
     Channel = channel(#{conn_state => connected}),
     Packet = ?DISCONNECT_PACKET(?RC_IMPLEMENTATION_SPECIFIC_ERROR),
-    {shutdown, implementation_specific_error, Packet, Channel}
-        = emqx_channel:handle_in(?AUTH_PACKET(), Channel).
+    {ok, [{outgoing, Packet},
+          {close, implementation_specific_error}], Channel}
+    = emqx_channel:handle_in(?AUTH_PACKET(), Channel).
 
 t_handle_in_frame_error(_) ->
     IdleChannel = channel(#{conn_state => idle}),
     {shutdown, frame_too_large, _}
-        = emqx_channel:handle_in({frame_error, frame_too_large}, IdleChannel),
+      = emqx_channel:handle_in({frame_error, frame_too_large}, IdleChannel),
     ConnectingChan =  channel(#{conn_state => connecting}),
-    {shutdown, frame_too_large, ?CONNACK_PACKET(?RC_MALFORMED_PACKET), _}
-        = emqx_channel:handle_in({frame_error, frame_too_large}, ConnectingChan),
+    ConnackPacket = ?CONNACK_PACKET(?RC_MALFORMED_PACKET),
+    {shutdown, frame_too_large, ConnackPacket, _}
+      = emqx_channel:handle_in({frame_error, frame_too_large}, ConnectingChan),
+    DisconnectPacket = ?DISCONNECT_PACKET(?RC_MALFORMED_PACKET),
     ConnectedChan = channel(#{conn_state => connected}),
-    {shutdown, malformed_Packet, ?DISCONNECT_PACKET(?RC_MALFORMED_PACKET), _}
-        = emqx_channel:handle_in({frame_error, frame_too_large}, ConnectedChan),
+    {ok, [{outgoing, DisconnectPacket}, {close, frame_too_large}], _}
+      = emqx_channel:handle_in({frame_error, frame_too_large}, ConnectedChan),
     DisconnectedChan = channel(#{conn_state => disconnected}),
     {ok, DisconnectedChan}
-        = emqx_channel:handle_in({frame_error, frame_too_large}, DisconnectedChan).
+      = emqx_channel:handle_in({frame_error, frame_too_large}, DisconnectedChan).
 
-%% TODO:
 t_handle_in_expected_packet(_) ->
-    {shutdown, protocol_error, ?DISCONNECT_PACKET(?RC_PROTOCOL_ERROR), _Chan}
-        = emqx_channel:handle_in(packet, channel()).
+    Packet = ?DISCONNECT_PACKET(?RC_PROTOCOL_ERROR),
+    {ok, [{outgoing, Packet}, {close, protocol_error}], _Chan}
+      = emqx_channel:handle_in(packet, channel()).
 
 t_process_connect(_) ->
     ok = meck:expect(emqx_cm, open_session,
                      fun(true, _ClientInfo, _ConnInfo) ->
                              {ok, #{session => session(), present => false}}
                      end),
-    {ok, [{connack, ?CONNACK_PACKET(?RC_SUCCESS)}], _Channel}
-        = emqx_channel:process_connect(connpkt(), channel(#{conn_state => idle})).
+    {ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS)}], _Chan}
+      = emqx_channel:process_connect(connpkt(), channel(#{conn_state => idle})).
 
-t_handle_publish_qos0(_) ->
+t_process_publish_qos0(_) ->
     ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
     Publish = ?PUBLISH_PACKET(?QOS_0, <<"t">>, 1, <<"payload">>),
-    {ok, _Channel} = emqx_channel:handle_publish(Publish, channel()).
+    {ok, _Channel} = emqx_channel:process_publish(Publish, channel()).
 
 t_process_publish_qos1(_) ->
     ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
-    Msg = emqx_message:make(test, ?QOS_1, <<"t">>, <<"payload">>),
+    Publish = ?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>),
     {ok, ?PUBACK_PACKET(1, ?RC_NO_MATCHING_SUBSCRIBERS), _Channel}
-        = emqx_channel:process_publish(1, Msg, channel()).
+      = emqx_channel:process_publish(Publish, channel()).
 
 t_process_subscribe(_) ->
     ok = meck:expect(emqx_session, subscribe, fun(_, _, _, Session) -> {ok, Session} end),
@@ -317,55 +321,57 @@ t_process_unsubscribe(_) ->
     {[?RC_SUCCESS], _Channel} = emqx_channel:process_unsubscribe(TopicFilters, channel()).
 
 %%--------------------------------------------------------------------
-%% Test cases for handle_out
+%% Test cases for handle_deliver
 %%--------------------------------------------------------------------
 
-t_handle_out_delivers(_) ->
+t_handle_deliver(_) ->
     WithPacketId = fun(Msgs) ->
                            lists:zip(lists:seq(1, length(Msgs)), Msgs)
                    end,
     ok = meck:expect(emqx_session, deliver,
                      fun(Delivers, Session) ->
-                             Msgs = [Msg || {deliver, _, Msg} <- Delivers],
-                             Publishes = [{publish, PacketId, Msg}
-                                          || {PacketId, Msg} <- WithPacketId(Msgs)],
+                             Publishes = WithPacketId([Msg || {deliver, _, Msg} <- Delivers]),
                              {ok, Publishes, Session}
                      end),
-    ok = meck:expect(emqx_session, info, fun(retry_interval, _Session) -> 20000 end),
+    ok = meck:expect(emqx_session, info, fun(retry_interval, _Session) -> 20 end),
     Msg0 = emqx_message:make(test, ?QOS_1, <<"t1">>, <<"qos1">>),
     Msg1 = emqx_message:make(test, ?QOS_2, <<"t2">>, <<"qos2">>),
     Delivers = [{deliver, <<"+">>, Msg0}, {deliver, <<"+">>, Msg1}],
-    {ok, {outgoing, Packets}, _Ch} = emqx_channel:handle_out(Delivers, channel()),
+    {ok, {outgoing, Packets}, _Ch} = emqx_channel:handle_deliver(Delivers, channel()),
     ?assertEqual([?QOS_1, ?QOS_2], [emqx_packet:qos(Pkt)|| Pkt <- Packets]).
 
-t_handle_out_publishes(_) ->
+%%--------------------------------------------------------------------
+%% Test cases for handle_out
+%%--------------------------------------------------------------------
+
+t_handle_out_publish(_) ->
     Channel = channel(#{conn_state => connected}),
-    Pub0 = {publish, undefined, emqx_message:make(<<"t">>, <<"qos0">>)},
-    Pub1 = {publish, 1, emqx_message:make(<<"c">>, ?QOS_1, <<"t">>, <<"qos1">>)},
+    Pub0 = {undefined, emqx_message:make(<<"t">>, <<"qos0">>)},
+    Pub1 = {1, emqx_message:make(<<"c">>, ?QOS_1, <<"t">>, <<"qos1">>)},
     {ok, {outgoing, Packets}, _NChannel}
-        = emqx_channel:handle_out({publish, [Pub0, Pub1]}, Channel),
+      = emqx_channel:handle_out(publish, [Pub0, Pub1], Channel),
     ?assertEqual(2, length(Packets)).
-    % ?assertEqual(#{publish_out => 2}, emqx_channel:info(pub_stats, NChannel)).
 
-t_handle_out_publish(_) ->
+t_handle_out_publish_1(_) ->
     Msg = emqx_message:make(<<"clientid">>, ?QOS_1, <<"t">>, <<"payload">>),
     {ok, ?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>), _Chan}
-        = emqx_channel:handle_out({publish, 1, Msg}, channel()).
+      = emqx_channel:handle_out(publish, [{1, Msg}], channel()).
 
 t_handle_out_publish_nl(_) ->
     ClientInfo = clientinfo(#{clientid => <<"clientid">>}),
     Channel = channel(#{clientinfo => ClientInfo}),
     Msg = emqx_message:make(<<"clientid">>, ?QOS_1, <<"t1">>, <<"qos1">>),
-    Publish = {publish, 1, emqx_message:set_flag(nl, Msg)},
-    {ok, Channel} = emqx_channel:handle_out(Publish, Channel).
+    Pubs = [{1, emqx_message:set_flag(nl, Msg)}],
+    {ok, Channel} = emqx_channel:handle_out(publish, Pubs, Channel).
 
 t_handle_out_connack_sucess(_) ->
-    {ok, [{connack, ?CONNACK_PACKET(?RC_SUCCESS, _SP, _)}], _Chan}
-        = emqx_channel:handle_out(connack, {?RC_SUCCESS, 0, connpkt()}, channel()).
+    {ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS, 0, _)}], Channel}
+      = emqx_channel:handle_out(connack, {?RC_SUCCESS, 0, connpkt()}, channel()),
+    ?assertEqual(connected, emqx_channel:info(conn_state, Channel)).
 
 t_handle_out_connack_failure(_) ->
     {shutdown, not_authorized, ?CONNACK_PACKET(?RC_NOT_AUTHORIZED), _Chan}
-        = emqx_channel:handle_out(connack, {?RC_NOT_AUTHORIZED, connpkt()}, channel()).
+      = emqx_channel:handle_out(connack, {?RC_NOT_AUTHORIZED, connpkt()}, channel()).
 
 t_handle_out_puback(_) ->
     Channel = channel(#{conn_state => connected}),
@@ -376,33 +382,33 @@ t_handle_out_puback(_) ->
 t_handle_out_pubrec(_) ->
     Channel = channel(#{conn_state => connected}),
     {ok, ?PUBREC_PACKET(1, ?RC_SUCCESS), _NChannel}
-        = emqx_channel:handle_out(pubrec, {1, ?RC_SUCCESS}, Channel).
-    % ?assertEqual(#{pubrec_out => 1}, emqx_channel:info(pub_stats, NChannel)).
+      = emqx_channel:handle_out(pubrec, {1, ?RC_SUCCESS}, Channel).
 
 t_handle_out_pubrel(_) ->
     Channel = channel(#{conn_state => connected}),
     {ok, ?PUBREL_PACKET(1), Channel1}
-        = emqx_channel:handle_out(pubrel, {1, ?RC_SUCCESS}, Channel),
+      = emqx_channel:handle_out(pubrel, {1, ?RC_SUCCESS}, Channel),
     {ok, ?PUBREL_PACKET(2, ?RC_SUCCESS), _Channel2}
-        = emqx_channel:handle_out(pubrel, {2, ?RC_SUCCESS}, Channel1).
-    % ?assertEqual(#{pubrel_out => 2}, emqx_channel:info(pub_stats, Channel2)).
+      = emqx_channel:handle_out(pubrel, {2, ?RC_SUCCESS}, Channel1).
 
 t_handle_out_pubcomp(_) ->
     {ok, ?PUBCOMP_PACKET(1, ?RC_SUCCESS), _Channel}
-        = emqx_channel:handle_out(pubcomp, {1, ?RC_SUCCESS}, channel()).
-    % ?assertEqual(#{pubcomp_out => 1}, emqx_channel:info(pub_stats, Channel)).
+      = emqx_channel:handle_out(pubcomp, {1, ?RC_SUCCESS}, channel()).
 
 t_handle_out_suback(_) ->
-    {ok, ?SUBACK_PACKET(1, [?QOS_2]), _Channel}
-        = emqx_channel:handle_out(suback, {1, [?QOS_2]}, channel()).
+    Replies = [{outgoing, ?SUBACK_PACKET(1, [?QOS_2])}, {event, updated}],
+    {ok, Replies, _Channel}
+      = emqx_channel:handle_out(suback, {1, [?QOS_2]}, channel()).
 
 t_handle_out_unsuback(_) ->
-    {ok, ?UNSUBACK_PACKET(1, [?RC_SUCCESS]), _Channel}
-        = emqx_channel:handle_out(unsuback, {1, [?RC_SUCCESS]}, channel()).
+    Replies = [{outgoing, ?UNSUBACK_PACKET(1, [?RC_SUCCESS])}, {event, updated}],
+    {ok, Replies, _Channel}
+      = emqx_channel:handle_out(unsuback, {1, [?RC_SUCCESS]}, channel()).
 
 t_handle_out_disconnect(_) ->
-    {shutdown, normal, ?DISCONNECT_PACKET(?RC_SUCCESS), _Chan}
-        = emqx_channel:handle_out(disconnect, ?RC_SUCCESS, channel()).
+    Packet = ?DISCONNECT_PACKET(?RC_SUCCESS),
+    {ok, [{outgoing, Packet}, {close, normal}], _Chan}
+      = emqx_channel:handle_out(disconnect, ?RC_SUCCESS, channel()).
 
 t_handle_out_unexpected(_) ->
     {ok, _Channel} = emqx_channel:handle_out(unexpected, <<"data">>, channel()).
@@ -444,33 +450,33 @@ t_handle_info_unsubscribe(_) ->
     {ok, _Chan} = emqx_channel:handle_info({unsubscribe, topic_filters()}, channel()).
 
 t_handle_info_sock_closed(_) ->
-    {ok, _Chan} = emqx_channel:handle_out({sock_closed, reason},
-                                          channel(#{conn_state => disconnected})).
+    Channel = channel(#{conn_state => disconnected}),
+    {ok, Channel} = emqx_channel:handle_info({sock_closed, reason}, Channel).
 
 %%--------------------------------------------------------------------
 %% Test cases for handle_timeout
 %%--------------------------------------------------------------------
 
 t_handle_timeout_emit_stats(_) ->
-    ok = meck:expect(emqx_cm, set_chan_stats, fun(_, _) -> ok end),
     TRef = make_ref(),
+    ok = meck:expect(emqx_cm, set_chan_stats, fun(_, _) -> ok end),
     Channel = emqx_channel:set_field(timers, #{stats_timer => TRef}, channel()),
     {ok, _Chan} = emqx_channel:handle_timeout(TRef, {emit_stats, []}, Channel).
 
 t_handle_timeout_keepalive(_) ->
     TRef = make_ref(),
-    _Channel = emqx_channel:set_field(timers, #{alive_timer => TRef}, channel()),
-    {ok, _Chan} = emqx_channel:handle_timeout(make_ref(), {keepalive, 10}, channel()).
+    Channel = emqx_channel:set_field(timers, #{alive_timer => TRef}, channel()),
+    {ok, _Chan} = emqx_channel:handle_timeout(make_ref(), {keepalive, 10}, Channel).
 
 t_handle_timeout_retry_delivery(_) ->
-    ok = meck:expect(emqx_session, retry, fun(Session) -> {ok, Session} end),
     TRef = make_ref(),
-    _Channel = emqx_channel:set_field(timers, #{retry_timer => TRef}, channel()),
-    {ok, _Chan} = emqx_channel:handle_timeout(TRef, retry_delivery, channel()).
+    ok = meck:expect(emqx_session, retry, fun(Session) -> {ok, Session} end),
+    Channel = emqx_channel:set_field(timers, #{retry_timer => TRef}, channel()),
+    {ok, _Chan} = emqx_channel:handle_timeout(TRef, retry_delivery, Channel).
 
 t_handle_timeout_expire_awaiting_rel(_) ->
-    ok = meck:expect(emqx_session, expire, fun(_, Session) -> {ok, Session} end),
     TRef = make_ref(),
+    ok = meck:expect(emqx_session, expire, fun(_, Session) -> {ok, Session} end),
     Channel = emqx_channel:set_field(timers, #{await_timer => TRef}, channel()),
     {ok, _Chan} = emqx_channel:handle_timeout(TRef, expire_awaiting_rel, Channel).
 
@@ -522,7 +528,7 @@ t_check_subscribe(_) ->
     ok = emqx_channel:check_subscribe(<<"t">>, ?DEFAULT_SUBOPTS, channel()),
     ok = meck:unload(emqx_zone).
 
-t_enrich_caps(_) ->
+t_enrich_connack_caps(_) ->
     ok = meck:new(emqx_mqtt_caps, [passthrough, no_history]),
     ok = meck:expect(emqx_mqtt_caps, get_caps,
                      fun(_Zone) ->
@@ -534,7 +540,7 @@ t_enrich_caps(_) ->
                           wildcard_subscription => true
                          }
                      end),
-    AckProps = emqx_channel:enrich_caps(#{}, channel()),
+    AckProps = emqx_channel:enrich_connack_caps(#{}, channel()),
     ?assertMatch(#{'Retain-Available' := 1,
                    'Maximum-Packet-Size' := 1024,
                    'Topic-Alias-Maximum' := 10,

+ 1 - 1
test/emqx_client_SUITE.erl

@@ -98,7 +98,7 @@ t_cm(_) ->
     {ok, C} = emqtt:start_link([{clientid, ClientId}]),
     {ok, _} = emqtt:connect(C),
     ct:sleep(500),
-    #{clientinfo := #{clientid := ClientId}} = emqx_cm:get_chan_attrs(ClientId),
+    #{clientinfo := #{clientid := ClientId}} = emqx_cm:get_chan_info(ClientId),
     emqtt:subscribe(C, <<"mytopic">>, 0),
     ct:sleep(1200),
     Stats = emqx_cm:get_chan_stats(ClientId),

+ 12 - 8
test/emqx_cm_SUITE.erl

@@ -46,19 +46,23 @@ t_reg_unreg_channel(_) ->
     ok = emqx_cm:unregister_channel(<<"clientid">>),
     ?assertEqual([], emqx_cm:lookup_channels(<<"clientid">>)).
 
-t_get_set_chan_attrs(_) ->
-    Attrs = #{proto_ver => 4, proto_name => <<"MQTT">>},
-    ok = emqx_cm:register_channel(<<"clientid">>),
-    ok = emqx_cm:set_chan_attrs(<<"clientid">>, Attrs),
-    ?assertEqual(Attrs, emqx_cm:get_chan_attrs(<<"clientid">>)),
+t_get_set_chan_info(_) ->
+    Info = #{proto_ver => 4, proto_name => <<"MQTT">>},
+    ok = emqx_cm:register_channel(<<"clientid">>, Info, []),
+    ?assertEqual(Info, emqx_cm:get_chan_info(<<"clientid">>)),
+    Info1 = Info#{proto_ver => 5},
+    true = emqx_cm:set_chan_info(<<"clientid">>, Info1),
+    ?assertEqual(Info1, emqx_cm:get_chan_info(<<"clientid">>)),
     ok = emqx_cm:unregister_channel(<<"clientid">>),
-    ?assertEqual(undefined, emqx_cm:get_chan_attrs(<<"clientid">>)).
+    ?assertEqual(undefined, emqx_cm:get_chan_info(<<"clientid">>)).
 
 t_get_set_chan_stats(_) ->
     Stats = [{recv_oct, 10}, {send_oct, 8}],
-    ok = emqx_cm:register_channel(<<"clientid">>),
-    ok = emqx_cm:set_chan_stats(<<"clientid">>, Stats),
+    ok = emqx_cm:register_channel(<<"clientid">>, #{}, Stats),
     ?assertEqual(Stats, emqx_cm:get_chan_stats(<<"clientid">>)),
+    Stats1 = [{recv_oct, 10}|Stats],
+    true = emqx_cm:set_chan_stats(<<"clientid">>, Stats1),
+    ?assertEqual(Stats1, emqx_cm:get_chan_stats(<<"clientid">>)),
     ok = emqx_cm:unregister_channel(<<"clientid">>),
     ?assertEqual(undefined, emqx_cm:get_chan_stats(<<"clientid">>)).
 

+ 2 - 3
test/emqx_connection_SUITE.erl

@@ -112,7 +112,7 @@ t_start_link_exit_on_activate(_) ->
 t_get_conn_info(_) ->
     with_connection(fun(CPid) ->
                             #{sockinfo := SockInfo} = emqx_connection:info(CPid),
-                            ?assertEqual(#{active_n => 100,limiter => undefined,
+                            ?assertEqual(#{active_n => 100,
                                            peername => {{127,0,0,1},3456},
                                            sockname => {{127,0,0,1},1883},
                                            sockstate => running,
@@ -218,8 +218,7 @@ t_handle_sock_closed(_) ->
                                              end),
                             CPid ! {tcp_closed, sock},
                             timer:sleep(100),
-                            %%TODO: closed?
-                            trap_exit(CPid, {shutdown, closed})
+                            trap_exit(CPid, {shutdown, tcp_closed})
                     end, #{trap_exit => true}).
 
 t_handle_outgoing(_) ->

+ 1 - 1
test/emqx_misc_SUITE.erl

@@ -102,7 +102,7 @@ t_drain_deliver(_) ->
     self() ! {deliver, t2, m2},
     ?assertEqual([{deliver, t1, m1},
                   {deliver, t2, m2}
-                 ], emqx_misc:drain_deliver()).
+                 ], emqx_misc:drain_deliver(2)).
 
 t_drain_down(_) ->
     {Pid1, _Ref1} = erlang:spawn_monitor(fun() -> ok end),

+ 11 - 5
test/emqx_packet_SUITE.erl

@@ -82,7 +82,10 @@ t_check_publish(_) ->
     Props = #{'Response-Topic' => <<"responsetopic">>, 'Topic-Alias' => 1},
     ok = emqx_packet:check(?PUBLISH_PACKET(?QOS_1, <<"topic">>, 1, Props, <<"payload">>)),
     ok = emqx_packet:check(#mqtt_packet_publish{packet_id = 1, topic_name = <<"t">>}),
-    ok = emqx_packet:check(#mqtt_packet_publish{topic_name = <<>>, properties = #{'Topic-Alias'=> 0}}),
+
+    {error, ?RC_PROTOCOL_ERROR} = emqx_packet:check(#mqtt_packet_publish{topic_name = <<>>,
+                                                                         properties = #{'Topic-Alias'=> 0}
+                                                                        }),
     {error, ?RC_PROTOCOL_ERROR} = emqx_packet:check(?PUBLISH_PACKET(?QOS_1, <<>>, 1, #{}, <<"payload">>)),
     {error, ?RC_TOPIC_NAME_INVALID} = emqx_packet:check(?PUBLISH_PACKET(?QOS_1, <<"+/+">>, 1, #{}, <<"payload">>)),
     {error, ?RC_TOPIC_ALIAS_INVALID} = emqx_packet:check(?PUBLISH_PACKET(1, <<"topic">>, 1, #{'Topic-Alias' => 0}, <<"payload">>)),
@@ -146,13 +149,15 @@ t_check_connect(_) ->
                                     ?CONNECT_PACKET(#mqtt_packet_connect{
                                                        properties = #{'Receive-Maximum' => 0}}), Opts),
     ConnPkt7 = #mqtt_packet_connect{clientid   = <<>>, clean_start = false},
-    {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID} = emqx_packet:check(ConnPkt7, Opts).                                                
+    {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID} = emqx_packet:check(ConnPkt7, Opts).
 
 t_from_to_message(_) ->
     ExpectedMsg = emqx_message:make(<<"clientid">>, ?QOS_0, <<"topic">>, <<"payload">>),
     ExpectedMsg1 = emqx_message:set_flag(retain, false, ExpectedMsg),
     ExpectedMsg2 = emqx_message:set_headers(#{peerhost => {127,0,0,1},
-                                              username => <<"test">>}, ExpectedMsg1),
+                                              protocol => mqtt,
+                                              username => <<"test">>
+                                             }, ExpectedMsg1),
     Pkt = #mqtt_packet{header = #mqtt_packet_header{type   = ?PUBLISH,
                                                     qos    = ?QOS_0,
                                                     retain = false,
@@ -161,7 +166,8 @@ t_from_to_message(_) ->
                                                        packet_id  = 10,
                                                        properties = #{}},
                        payload = <<"payload">>},
-    MsgFromPkt = emqx_packet:to_message(#{clientid => <<"clientid">>,
+    MsgFromPkt = emqx_packet:to_message(#{protocol => mqtt,
+                                          clientid => <<"clientid">>,
                                           username => <<"test">>,
                                           peerhost => {127,0,0,1}}, Pkt),
     ?assertEqual(ExpectedMsg2, MsgFromPkt#message{id = emqx_message:id(ExpectedMsg),
@@ -194,7 +200,7 @@ t_will_msg(_) ->
     Msg2 = emqx_packet:will_msg(Pkt2),
     ?assertEqual(<<"clientid">>, Msg2#message.from),
     ?assertEqual(<<"topic">>, Msg2#message.topic).
-    
+
 t_format(_) ->
     io:format("~s", [emqx_packet:format(#mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK, retain = true, dup = 0}, variable = undefined})]),
     io:format("~s", [emqx_packet:format(#mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK}, variable = 1, payload = <<"payload">>})]),

+ 6 - 17
test/emqx_reason_codes_SUITE.erl

@@ -25,23 +25,10 @@
 
 all() -> emqx_ct:all(?MODULE).
 
-% t_name(_) ->
-%     error('TODO').
-
-% t_text(_) ->
-%     error('TODO').
-
-% t_mqtt_frame_error(_) ->
-%     error('TODO').
-
-% t_connack_error(_) ->
-%     error('TODO').
-
-% t_compat(_) ->
-%     error('TODO').
-
-% t_formalized(_) ->
-%     error('TODO').
+t_frame_error(_) ->
+    ?assertEqual(?RC_PACKET_TOO_LARGE, emqx_reason_codes:frame_error(frame_too_large)),
+    ?assertEqual(?RC_MALFORMED_PACKET, emqx_reason_codes:frame_error(bad_packet_id)),
+    ?assertEqual(?RC_MALFORMED_PACKET, emqx_reason_codes:frame_error(bad_qos)).
 
 t_prop_name_text(_) ->
     ?assert(proper:quickcheck(prop_name_text(), prop_name_text(opts))).
@@ -77,6 +64,7 @@ prop_connack_error() ->
 %%--------------------------------------------------------------------
 %% Helper
 %%--------------------------------------------------------------------
+
 default_opts() ->
     default_opts([]).
 
@@ -161,3 +149,4 @@ mqttv5_version() ->
 
 mqttv3_version() ->
     oneof([?MQTT_PROTO_V3, ?MQTT_PROTO_V4]).
+

+ 14 - 19
test/emqx_session_SUITE.erl

@@ -59,7 +59,7 @@ t_session_init(_) ->
     ?assertEqual(0, emqx_session:info(retry_interval, Session)),
     ?assertEqual(0, emqx_session:info(awaiting_rel_cnt, Session)),
     ?assertEqual(100, emqx_session:info(awaiting_rel_max, Session)),
-    ?assertEqual(3600000, emqx_session:info(awaiting_rel_timeout, Session)),
+    ?assertEqual(300, emqx_session:info(await_rel_timeout, Session)),
     ?assert(is_integer(emqx_session:info(created_at, Session))).
 
 %%--------------------------------------------------------------------
@@ -67,28 +67,23 @@ t_session_init(_) ->
 %%--------------------------------------------------------------------
 
 t_session_info(_) ->
-    Info = emqx_session:info(session()),
-    ?assertMatch(#{subscriptions := #{},
-                   subscriptions_max := 0,
-                   upgrade_qos := false,
-                   inflight_max := 0,
+    ?assertMatch(#{subscriptions  := #{},
+                   upgrade_qos    := false,
                    retry_interval := 0,
-                   mqueue_len := 0,
-                   mqueue_max := 1000,
-                   mqueue_dropped := 0,
-                   next_pkt_id := 1,
-                   awaiting_rel := #{},
-                   awaiting_rel_max := 100,
-                   awaiting_rel_timeout := 3600000
-                  }, Info).
-
-t_session_attrs(_) ->
-    Attrs = emqx_session:attrs(session()),
-    io:format("~p~n", [Attrs]).
+                   await_rel_timeout := 300
+                  }, emqx_session:info(session())).
 
 t_session_stats(_) ->
     Stats = emqx_session:stats(session()),
-    io:format("~p~n", [Stats]).
+    ?assertMatch(#{subscriptions_max := 0,
+                   inflight_max      := 0,
+                   mqueue_len        := 0,
+                   mqueue_max        := 1000,
+                   mqueue_dropped    := 0,
+                   next_pkt_id       := 1,
+                   awaiting_rel_cnt  := 0,
+                   awaiting_rel_max  := 100
+                  }, maps:from_list(Stats)).
 
 %%--------------------------------------------------------------------
 %% Test cases for pub/sub

+ 1 - 1
test/emqx_ws_connection_SUITE.erl

@@ -138,7 +138,7 @@ t_websocket_info_incoming(_) ->
 
 t_websocket_info_deliver(_) ->
     with_ws_conn(fun(WsConn) ->
-                         ok = meck:expect(emqx_channel, handle_out,
+                         ok = meck:expect(emqx_channel, handle_deliver,
                                           fun(Delivers, Channel) ->
                                                   Packets = [emqx_message:to_packet(1, Msg) || {deliver, _, Msg} <- Delivers],
                                                   {ok, {outgoing, Packets}, Channel}

+ 3 - 3
test/emqx_zone_SUITE.erl

@@ -25,7 +25,7 @@
                {server_keepalive, 60},
                {upgrade_qos, false},
                {session_expiry_interval, 7200},
-               {retry_interval, 20000},
+               {retry_interval, 20},
                {mqueue_store_qos0, true},
                {mqueue_priorities, none},
                {mqueue_default_priority, highest},
@@ -43,7 +43,7 @@
                {enable_flapping_detect, false},
                {enable_ban, true},
                {enable_acl, true},
-               {await_rel_timeout, 300000},
+               {await_rel_timeout, 300},
                {acl_deny_action, ignore}
               ]).
 
@@ -101,4 +101,4 @@ t_uncovered_func(_) ->
     emqx_zone:stop().
 
 t_frame_options(_) ->
-    ?assertMatch(#{strict_mode := _, max_size := _ }, emqx_zone:mqtt_frame_options(zone)).
+    ?assertMatch(#{strict_mode := _, max_size := _ }, emqx_zone:mqtt_frame_options(zone)).