Просмотр исходного кода

Merge pull request #1758 from emqtt/emqx30-feng

Improve the hooks design for emqx 3.0
Feng Lee 7 лет назад
Родитель
Сommit
cd7f79ec04

+ 0 - 61
TODO

@@ -1,61 +0,0 @@
-
-## MQTT 5.0
-
-1. Topic Alias
-2. Subscriber ID
-3. Session ensure stats
-4. Message Expiration
-
-## Connection
-
-## WebSocket
-
-## Listeners
-
-## Protocol
-
-1. Global ACL cache with limited age and size?
-2. Whether to enable ACL for each zone?
-
-## Session
-
-## Bridges
-
-Config
-CLI
-Remote Bridge
-replay queue
-
-## Access Control
-
-  Global ACL Cache
-  Add ACL cache emqx_access_control module
-
-## Zone
-
-## Hooks
-
-The hooks design...
-
-## MQueue
-
-Bound Queue
-LastValue Queue
-Priority Queue
-
-## Supervisor tree
-
-KernelSup
-
-## Managment
-
-## Dashboard
-
-## Testcases
-
-1. Update the README.md
-2. Update the Documentation
-3. Shared subscription and dispatch strategy
-4. Remove lager syslog:
-   dep_lager_syslog = git https://github.com/basho/lager_syslog
-

+ 0 - 6
etc/emqx.conf

@@ -605,12 +605,6 @@ zone.external.max_awaiting_rel = 100
 ## Value: Duration
 zone.external.await_rel_timeout = 60s
 
-## Whether to ignore loop delivery of messages.
-##
-## Value: true | false
-## Default: false
-zone.external.ignore_loop_deliver = false
-
 ## Default session expiry interval for MQTT V3.1.1 connections.
 ##
 ## Value: Duration

+ 0 - 1
priv/emqx.schema

@@ -768,7 +768,6 @@ end}.
 
 %% @doc Ignore loop delivery of messages
 {mapping, "zone.$name.ignore_loop_deliver", "emqx.zones", [
-  {default, false},
   {datatype, {enum, [true, false]}}
 ]}.
 

+ 2 - 4
src/emqx.erl

@@ -84,10 +84,8 @@ subscribe(Topic, Subscriber, Options) when is_tuple(Subscriber)->
     {SubPid, SubId} = Subscriber,
     emqx_broker:subscribe(iolist_to_binary(Topic), SubPid, SubId, Options).
 
-%% @doc Publish Message
--spec(publish(message()) -> {ok, delivery()} | {error, term()}).
-publish(Msg) ->
-    emqx_broker:publish(Msg).
+-spec(publish(message()) -> {ok, emqx_types:dispatches()}).
+publish(Msg) -> emqx_broker:publish(Msg).
 
 -spec(unsubscribe(topic() | string()) -> ok | {error, term()}).
 unsubscribe(Topic) ->

+ 16 - 14
src/emqx_broker.erl

@@ -143,16 +143,18 @@ multi_unsubscribe(Topics, SubPid, SubId) when is_pid(SubPid), ?is_subid(SubId) -
 %% Publish
 %%------------------------------------------------------------------------------
 
--spec(publish(message()) -> delivery()).
+-spec(publish(message()) -> {ok, emqx_types:dispatches()}).
 publish(Msg) when is_record(Msg, message) ->
     _ = emqx_tracer:trace(publish, Msg),
-    case emqx_hooks:run('message.publish', [], Msg) of
-        {ok, Msg1 = #message{topic = Topic}} ->
-            route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1));
-        {stop, Msg1} ->
-            emqx_logger:warning("Stop publishing: ~p", [Msg]), delivery(Msg1)
-    end.
-
+    {ok, case emqx_hooks:run('message.publish', [], Msg) of
+             {ok, Msg1 = #message{topic = Topic}} ->
+                   Delivery = route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1)),
+                   Delivery#delivery.flows;
+               {stop, _} ->
+                   emqx_logger:warning("Stop publishing: ~p", [Msg]), []
+         end}.
+
+-spec(safe_publish(message()) -> ok).
 %% Called internally
 safe_publish(Msg) when is_record(Msg, message) ->
     try
@@ -172,8 +174,8 @@ delivery(Msg) ->
 %%------------------------------------------------------------------------------
 
 route([], Delivery = #delivery{message = Msg}) ->
-    emqx_hooks:run('message.dropped', [undefined, Msg]),
-    dropped(Msg#message.topic), Delivery;
+    emqx_hooks:run('message.dropped', [#{node => node()}, Msg]),
+    inc_dropped_cnt(Msg#message.topic), Delivery;
 
 route([{To, Node}], Delivery) when Node =:= node() ->
     dispatch(To, Delivery);
@@ -215,8 +217,8 @@ forward(Node, To, Delivery) ->
 dispatch(Topic, Delivery = #delivery{message = Msg, flows = Flows}) ->
     case subscribers(Topic) of
         [] ->
-            emqx_hooks:run('message.dropped', [undefined, Msg]),
-            dropped(Topic), Delivery;
+            emqx_hooks:run('message.dropped', [#{node => node()}, Msg]),
+            inc_dropped_cnt(Topic), Delivery;
         [Sub] -> %% optimize?
             dispatch(Sub, Topic, Msg),
             Delivery#delivery{flows = [{dispatch, Topic, 1}|Flows]};
@@ -232,9 +234,9 @@ dispatch({SubPid, _SubId}, Topic, Msg) when is_pid(SubPid) ->
 dispatch({share, _Group, _Sub}, _Topic, _Msg) ->
     ignored.
 
-dropped(<<"$SYS/", _/binary>>) ->
+inc_dropped_cnt(<<"$SYS/", _/binary>>) ->
     ok;
-dropped(_Topic) ->
+inc_dropped_cnt(_Topic) ->
     emqx_metrics:inc('messages/dropped').
 
 -spec(subscribers(topic()) -> [subscriber()]).

+ 1 - 1
src/emqx_frame.erl

@@ -331,7 +331,7 @@ parse_variable_byte_integer(<<0:1, Len:7, Rest/binary>>, Multiplier, Value) ->
     {Value + Len * Multiplier, Rest}.
 
 parse_topic_filters(subscribe, Bin) ->
-    [{Topic, #{rh => Rh, rap => Rap, nl => Nl, qos => QoS, rc => 0, subid => 0}}
+    [{Topic, #{rh => Rh, rap => Rap, nl => Nl, qos => QoS, rc => 0}}
      || <<Len:16/big, Topic:Len/binary, _:2, Rh:2, Rap:1, Nl:1, QoS:2>> <= Bin];
 
 parse_topic_filters(unsubscribe, Bin) ->

+ 16 - 0
src/emqx_message.erl

@@ -22,6 +22,7 @@
 -export([get_flag/2, get_flag/3, set_flag/2, set_flag/3, unset_flag/2]).
 -export([set_headers/2]).
 -export([get_header/2, get_header/3, set_header/3]).
+-export([format/1]).
 
 -spec(make(topic(), payload()) -> message()).
 make(Topic, Payload) ->
@@ -55,10 +56,14 @@ get_flag(Flag, #message{flags = Flags}, Default) ->
     maps:get(Flag, Flags, Default).
 
 -spec(set_flag(message_flag(), message()) -> message()).
+set_flag(Flag, Msg = #message{flags = undefined}) when is_atom(Flag) ->
+    Msg#message{flags = #{Flag => true}};
 set_flag(Flag, Msg = #message{flags = Flags}) when is_atom(Flag) ->
     Msg#message{flags = maps:put(Flag, true, Flags)}.
 
 -spec(set_flag(message_flag(), boolean() | integer(), message()) -> message()).
+set_flag(Flag, Val, Msg = #message{flags = undefined}) when is_atom(Flag) ->
+    Msg#message{flags = #{Flag => Val}};
 set_flag(Flag, Val, Msg = #message{flags = Flags}) when is_atom(Flag) ->
     Msg#message{flags = maps:put(Flag, Val, Flags)}.
 
@@ -83,3 +88,14 @@ set_header(Hdr, Val, Msg = #message{headers = undefined}) ->
 set_header(Hdr, Val, Msg = #message{headers = Headers}) ->
     Msg#message{headers = maps:put(Hdr, Val, Headers)}.
 
+format(#message{id = Id, qos = QoS, topic = Topic, from = From, flags = Flags, headers = Headers}) ->
+    io_lib:format("Message(Id=~s, QoS=~w, Topic=~s, From=~s, Flags=~s, Headers=~s)",
+                  [Id, QoS, Topic, From, format(flags, Flags), format(headers, Headers)]).
+
+format(_, undefined) ->
+    "";
+format(flags, Flags) ->
+    io_lib:format("~p", [[Flag || {Flag, true} <- maps:to_list(Flags)]]);
+format(headers, Headers) ->
+    io_lib:format("~p", [Headers]).
+

+ 18 - 20
src/emqx_mod_presence.erl

@@ -19,50 +19,48 @@
 -include("emqx.hrl").
 
 -export([load/1, unload/1]).
--export([on_client_connected/3, on_client_disconnected/3]).
+-export([on_client_connected/4, on_client_disconnected/3]).
 
 load(Env) ->
-    emqx:hook('client.connected',    fun ?MODULE:on_client_connected/3, [Env]),
-    emqx:hook('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Env]).
+    emqx_hooks:add('client.connected',    fun ?MODULE:on_client_connected/4, [Env]),
+    emqx_hooks:add('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Env]).
 
-on_client_connected(ConnAck, Client = #client{id = ClientId,
-                                              username  = Username,
-                                              peername  = {IpAddr, _}
-                                              %%clean_sess = CleanSess,
-                                              %%proto_ver = ProtoVer
-                                             }, Env) ->
+on_client_connected(#{client_id := ClientId,
+                      username  := Username,
+                      peername  := {IpAddr, _}}, ConnAck, ConnInfo, Env) ->
     case emqx_json:safe_encode([{clientid, ClientId},
                                 {username, Username},
                                 {ipaddress, iolist_to_binary(esockd_net:ntoa(IpAddr))},
-                                %%{clean_sess, CleanSess}, %%TODO:: fixme later
-                                %%{protocol, ProtoVer},
+                                {clean_start, proplists:get_value(clean_start, ConnInfo)},
+                                {proto_ver, proplists:get_value(proto_ver, ConnInfo)},
+                                {proto_name, proplists:get_value(proto_name, ConnInfo)},
+                                {keepalive, proplists:get_value(keepalive, ConnInfo)},
                                 {connack, ConnAck},
-                                {ts, emqx_time:now_secs()}]) of
+                                {ts, os:system_time(second)}]) of
         {ok, Payload} ->
             emqx:publish(message(qos(Env), topic(connected, ClientId), Payload));
         {error, Reason} ->
             emqx_logger:error("[Presence Module] Json error: ~p", [Reason])
-    end,
-    {ok, Client}.
+    end.
 
-on_client_disconnected(Reason, #client{id = ClientId, username = Username}, Env) ->
+on_client_disconnected(#{client_id := ClientId, username := Username}, Reason, Env) ->
     case emqx_json:safe_encode([{clientid, ClientId},
                                 {username, Username},
                                 {reason, reason(Reason)},
-                                {ts, emqx_time:now_secs()}]) of
+                                {ts, os:system_time(second)}]) of
         {ok, Payload} ->
             emqx_broker:publish(message(qos(Env), topic(disconnected, ClientId), Payload));
         {error, Reason} ->
             emqx_logger:error("[Presence Module] Json error: ~p", [Reason])
-    end, ok.
+    end.
 
 unload(_Env) ->
-    emqx:unhook('client.connected',    fun ?MODULE:on_client_connected/3),
-    emqx:unhook('client.disconnected', fun ?MODULE:on_client_disconnected/3).
+    emqx_hooks:delete('client.connected',    fun ?MODULE:on_client_connected/4),
+    emqx_hooks:delete('client.disconnected', fun ?MODULE:on_client_disconnected/3).
 
 message(QoS, Topic, Payload) ->
     Msg = emqx_message:make(?MODULE, QoS, Topic, iolist_to_binary(Payload)),
-    emqx_message:set_flags(#{sys => true}, Msg).
+    emqx_message:set_flag(sys, Msg).
 
 topic(connected, ClientId) ->
     emqx_topic:systop(iolist_to_binary(["clients/", ClientId, "/connected"]));

+ 11 - 28
src/emqx_mod_rewrite.erl

@@ -15,47 +15,31 @@
 -module(emqx_mod_rewrite).
 
 -include_lib("emqx.hrl").
-
 -include_lib("emqx_mqtt.hrl").
 
 -export([load/1, unload/1]).
 
--export([rewrite_subscribe/4, rewrite_unsubscribe/4, rewrite_publish/2]).
-
-%%--------------------------------------------------------------------
-%% API
-%%--------------------------------------------------------------------
+-export([rewrite_subscribe/3, rewrite_unsubscribe/3, rewrite_publish/2]).
 
 load(Rules0) ->
     Rules = compile(Rules0),
-    emqx:hook('client.subscribe',  fun ?MODULE:rewrite_subscribe/4, [Rules]),
-    emqx:hook('client.unsubscribe',fun ?MODULE:rewrite_unsubscribe/4, [Rules]),
-    emqx:hook('message.publish',   fun ?MODULE:rewrite_publish/2, [Rules]).
+    emqx_hooks:add('client.subscribe',  fun ?MODULE:rewrite_subscribe/4, [Rules]),
+    emqx_hooks:add('client.unsubscribe',fun ?MODULE:rewrite_unsubscribe/4, [Rules]),
+    emqx_hooks:add('message.publish',   fun ?MODULE:rewrite_publish/2, [Rules]).
 
-rewrite_subscribe(_ClientId, _Username, TopicTable, Rules) ->
-    emqx_logger:info("Rewrite subscribe: ~p", [TopicTable]),
+rewrite_subscribe(_Credentials, TopicTable, Rules) ->
     {ok, [{match_rule(Topic, Rules), Opts} || {Topic, Opts} <- TopicTable]}.
 
-rewrite_unsubscribe(_ClientId, _Username, TopicTable, Rules) ->
-    emqx_logger:info("Rewrite unsubscribe: ~p", [TopicTable]),
+rewrite_unsubscribe(_Credentials, TopicTable, Rules) ->
     {ok, [{match_rule(Topic, Rules), Opts} || {Topic, Opts} <- TopicTable]}.
 
 rewrite_publish(Message = #message{topic = Topic}, Rules) ->
-    %%TODO: this will not work if the client is always online.
-    RewriteTopic =
-    case get({rewrite, Topic}) of
-        undefined ->
-            DestTopic = match_rule(Topic, Rules),
-            put({rewrite, Topic}, DestTopic), DestTopic;
-        DestTopic ->
-            DestTopic
-        end,
-    {ok, Message#message{topic = RewriteTopic}}.
+    {ok, Message#message{topic = match_rule(Topic, Rules)}}.
 
 unload(_) ->
-    emqx:unhook('client.subscribe',  fun ?MODULE:rewrite_subscribe/4),
-    emqx:unhook('client.unsubscribe',fun ?MODULE:rewrite_unsubscribe/4),
-    emqx:unhook('message.publish',   fun ?MODULE:rewrite_publish/2).
+    emqx_hooks:delete('client.subscribe',  fun ?MODULE:rewrite_subscribe/3),
+    emqx_hooks:delete('client.unsubscribe',fun ?MODULE:rewrite_unsubscribe/3),
+    emqx_hooks:delete('message.publish',   fun ?MODULE:rewrite_publish/2).
 
 %%--------------------------------------------------------------------
 %% Internal functions
@@ -79,8 +63,7 @@ match_regx(Topic, MP, Dest) ->
                     fun({Var, Val}, Acc) ->
                         re:replace(Acc, Var, Val, [global])
                     end, Dest, Vars));
-        nomatch ->
-            Topic
+        nomatch -> Topic
     end.
 
 compile(Rules) ->

+ 9 - 15
src/emqx_mod_subscription.erl

@@ -17,32 +17,26 @@
 -behaviour(emqx_gen_mod).
 
 -include_lib("emqx.hrl").
-
 -include_lib("emqx_mqtt.hrl").
 
--export([load/1, on_client_connected/3, unload/1]).
-
--define(TAB, ?MODULE).
+-export([load/1, on_session_created/3, unload/1]).
 
 %%--------------------------------------------------------------------
 %% Load/Unload Hook
 %%--------------------------------------------------------------------
 
 load(Topics) ->
-    emqx:hook('client.connected', fun ?MODULE:on_client_connected/3, [Topics]).
-
-on_client_connected(RC, Client = #client{id = ClientId, pid = ClientPid, username = Username}, Topics)
-    when RC < 16#80 ->
-    Replace = fun(Topic) -> rep(<<"%u">>, Username, rep(<<"%c">>, ClientId, Topic)) end,
-    TopicTable = [{Replace(Topic), QoS} || {Topic, QoS} <- Topics],
-    ClientPid ! {subscribe, TopicTable},
-    {ok, Client};
+    emqx_hooks:add('session.created', fun ?MODULE:on_session_created/3, [Topics]).
 
-on_client_connected(_ConnAck, _Client, _State) ->
-    ok.
+on_session_created(#{client_id := ClientId}, SessInfo, Topics) ->
+    Username = proplists:get_value(username, SessInfo),
+    Replace = fun(Topic) ->
+                      rep(<<"%u">>, Username, rep(<<"%c">>, ClientId, Topic))
+              end,
+    emqx_session:subscribe(self(), [{Replace(Topic), #{qos => QoS}} || {Topic, QoS} <- Topics]).
 
 unload(_) ->
-    emqx:unhook('client.connected', fun ?MODULE:on_client_connected/3).
+    emqx_hooks:delete('session.created', fun ?MODULE:on_session_created/3).
 
 %%--------------------------------------------------------------------
 %% Internal functions

+ 52 - 0
src/emqx_mountpoint.erl

@@ -0,0 +1,52 @@
+%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+
+-module(emqx_mountpoint).
+
+-include("emqx.hrl").
+
+-export([mount/2, unmount/2]).
+-export([replvar/2]).
+
+-type(mountpoint() :: binary()).
+-export_type([mountpoint/0]).
+
+mount(undefined, Any) ->
+    Any;
+mount(MountPoint, Msg = #message{topic = Topic}) ->
+    Msg#message{topic = <<MountPoint/binary, Topic/binary>>};
+
+mount(MountPoint, TopicFilters) when is_list(TopicFilters) ->
+    [{<<MountPoint/binary, Topic/binary>>, SubOpts} || {Topic, SubOpts} <- TopicFilters].
+
+unmount(undefined, Msg) ->
+    Msg;
+unmount(MountPoint, Msg = #message{topic = Topic}) ->
+    case catch split_binary(Topic, byte_size(MountPoint)) of
+        {MountPoint, Topic1} -> Msg#message{topic = Topic1};
+        _Other -> Msg
+    end.
+
+replvar(undefined, _Vars) ->
+    undefined;
+replvar(MountPoint, #{client_id := ClientId, username := Username}) ->
+    lists:foldl(fun feed_var/2, MountPoint, [{<<"%c">>, ClientId}, {<<"%u">>, Username}]).
+
+feed_var({<<"%c">>, ClientId}, MountPoint) ->
+    emqx_topic:feed_var(<<"%c">>, ClientId, MountPoint);
+feed_var({<<"%u">>, undefined}, MountPoint) ->
+    MountPoint;
+feed_var({<<"%u">>, Username}, MountPoint) ->
+    emqx_topic:feed_var(<<"%u">>, Username, MountPoint).
+

+ 50 - 31
src/emqx_packet.erl

@@ -22,6 +22,7 @@
 -export([validate/1]).
 -export([format/1]).
 -export([to_message/2, from_message/2]).
+-export([will_msg/1]).
 
 %% @doc Protocol name of version
 -spec(protocol_name(mqtt_version()) -> binary()).
@@ -37,30 +38,40 @@ protocol_name(?MQTT_PROTO_V5) ->
 type_name(Type) when Type > ?RESERVED andalso Type =< ?AUTH ->
     lists:nth(Type, ?TYPE_NAMES).
 
+%%------------------------------------------------------------------------------
+%% Validate MQTT Packet
+%%------------------------------------------------------------------------------
+
 validate(?SUBSCRIBE_PACKET(_PacketId, _Properties, [])) ->
-    error(packet_empty_topic_filters);
+    error(topic_filters_invalid);
 validate(?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters)) ->
     validate_packet_id(PacketId)
         andalso validate_properties(?SUBSCRIBE, Properties)
             andalso ok == lists:foreach(fun validate_subscription/1, TopicFilters);
 
 validate(?UNSUBSCRIBE_PACKET(_PacketId, [])) ->
-    error(packet_empty_topic_filters);
+    error(topic_filters_invalid);
 validate(?UNSUBSCRIBE_PACKET(PacketId, TopicFilters)) ->
     validate_packet_id(PacketId)
         andalso ok == lists:foreach(fun emqx_topic:validate/1, TopicFilters);
 
+validate(?PUBLISH_PACKET(_QoS, <<>>, _, _)) ->
+    error(topic_name_invalid);
+validate(?PUBLISH_PACKET(_QoS, Topic, _, _)) ->
+    (not emqx_topic:wildcard(Topic)) orelse error(topic_name_invalid);
+
 validate(_Packet) ->
     true.
 
 validate_packet_id(0) ->
-    error(bad_packet_id);
+    error(packet_id_invalid);
 validate_packet_id(_) ->
     true.
 
-validate_properties(?SUBSCRIBE, #{'Subscription-Identifier' := 0}) ->
-    error(bad_subscription_identifier);
-validate_properties(?SUBSCRIBE, _) ->
+validate_properties(?SUBSCRIBE, #{'Subscription-Identifier' := I})
+    when I =< 0; I >= 16#FFFFFFF ->
+    error(subscription_identifier_invalid);
+validate_properties(_, _) ->
     true.
 
 validate_subscription({Topic, #{qos := QoS}}) ->
@@ -75,40 +86,48 @@ validate_qos(_) -> error(bad_qos).
 from_message(PacketId, Msg = #message{qos = QoS, topic = Topic, payload = Payload}) ->
     Dup = emqx_message:get_flag(dup, Msg, false),
     Retain = emqx_message:get_flag(retain, Msg, false),
+    Publish = #mqtt_packet_publish{topic_name = Topic,
+                                   packet_id  = PacketId,
+                                   %% TODO: Properties
+                                   properties = #{}},
     #mqtt_packet{header = #mqtt_packet_header{type   = ?PUBLISH,
+                                              dup    = Dup,
                                               qos    = QoS,
-                                              retain = Retain,
-                                              dup    = Dup},
-                 variable = #mqtt_packet_publish{topic_name = Topic,
-                                                 packet_id  = PacketId,
-                                                 properties = #{}}, %%TODO:
-                 payload = Payload}.
+                                              retain = Retain},
+                 variable = Publish, payload = Payload}.
 
 %% @doc Message from Packet
--spec(to_message(client_id(), mqtt_packet()) -> message()).
-to_message(ClientId, #mqtt_packet{header   = #mqtt_packet_header{type   = ?PUBLISH,
-                                                                 retain = Retain,
-                                                                 qos    = QoS,
-                                                                 dup    = Dup},
-                                  variable = #mqtt_packet_publish{topic_name = Topic,
-                                                                  properties = Props},
-                                  payload  = Payload}) ->
+-spec(to_message(emqx_types:credentials(), mqtt_packet()) -> message()).
+to_message(#{client_id := ClientId, username := Username},
+           #mqtt_packet{header   = #mqtt_packet_header{type   = ?PUBLISH,
+                                                       retain = Retain,
+                                                       qos    = QoS,
+                                                       dup    = Dup},
+                        variable = #mqtt_packet_publish{topic_name = Topic,
+                                                        properties = Props},
+                        payload  = Payload}) ->
     Msg = emqx_message:make(ClientId, QoS, Topic, Payload),
     Msg#message{flags = #{dup => Dup, retain => Retain},
-                headers = if
-                              Props =:= undefined -> #{};
-                              true -> Props
-                          end};
+                headers = merge_props(#{username => Username}, Props)}.
 
-to_message(_ClientId, #mqtt_packet_connect{will_flag = false}) ->
+-spec(will_msg(#mqtt_packet_connect{}) -> message()).
+will_msg(#mqtt_packet_connect{will_flag = false}) ->
     undefined;
-to_message(ClientId, #mqtt_packet_connect{will_retain  = Retain,
-                                          will_qos     = QoS,
-                                          will_topic   = Topic,
-                                          will_props   = Props,
-                                          will_payload = Payload}) ->
+will_msg(#mqtt_packet_connect{client_id    = ClientId,
+                              username     = Username,
+                              will_retain  = Retain,
+                              will_qos     = QoS,
+                              will_topic   = Topic,
+                              will_props   = Properties,
+                              will_payload = Payload}) ->
     Msg = emqx_message:make(ClientId, QoS, Topic, Payload),
-    Msg#message{flags = #{qos => QoS, retain => Retain}, headers = Props}.
+    Msg#message{flags = #{dup => false, retain => Retain},
+                headers = merge_props(#{username => Username}, Properties)}.
+
+merge_props(Headers, undefined) ->
+    Headers;
+merge_props(Headers, Props) ->
+    maps:merge(Headers, Props).
 
 %% @doc Format packet
 -spec(format(mqtt_packet()) -> iolist()).

+ 146 - 132
src/emqx_protocol.erl

@@ -18,11 +18,14 @@
 -include("emqx_mqtt.hrl").
 
 -export([init/2, info/1, caps/1, stats/1]).
+-export([client_id/1]).
 -export([credentials/1]).
--export([client/1, client_id/1]).
--export([session/1]).
 -export([parser/1]).
--export([received/2, process/2, deliver/2, send/2]).
+-export([session/1]).
+-export([received/2]).
+-export([process_packet/2]).
+-export([deliver/2]).
+-export([send/2]).
 -export([shutdown/2]).
 
 -record(pstate, {
@@ -34,12 +37,13 @@
           proto_name,
           ackprops,
           client_id,
-          client_pid,
+          conn_pid,
           conn_props,
           ack_props,
           username,
           session,
           clean_start,
+          topic_aliases,
           packet_size,
           will_msg,
           keepalive,
@@ -54,9 +58,12 @@
          }).
 
 -type(state() :: #pstate{}).
-
 -export_type([state/0]).
 
+-ifdef(TEST).
+-compile(export_all).
+-endif.
+
 -define(LOG(Level, Format, Args, PState),
         emqx_logger:Level([{client, PState#pstate.client_id}], "Client(~s@~s): " ++ Format,
                           [PState#pstate.client_id, esockd_net:format(PState#pstate.peername) | Args])).
@@ -75,10 +82,11 @@ init(#{peername := Peername, peercert := Peercert, sendfun := SendFun}, Options)
             proto_ver    = ?MQTT_PROTO_V4,
             proto_name   = <<"MQTT">>,
             client_id    = <<>>,
-            client_pid   = self(),
+            conn_pid     = self(),
             username     = init_username(Peercert, Options),
             is_super     = false,
             clean_start  = false,
+            topic_aliases = #{},
             packet_size  = emqx_zone:get_env(Zone, max_packet_size),
             mountpoint   = emqx_zone:get_env(Zone, mountpoint),
             is_bridge    = false,
@@ -104,13 +112,13 @@ set_username(_Username, PState) ->
 %%------------------------------------------------------------------------------
 
 info(#pstate{zone         = Zone,
+             client_id    = ClientId,
+             username     = Username,
              peername     = Peername,
              proto_ver    = ProtoVer,
              proto_name   = ProtoName,
-             conn_props   = ConnProps,
-             client_id    = ClientId,
-             username     = Username,
              clean_start  = CleanStart,
+             conn_props   = ConnProps,
              keepalive    = Keepalive,
              mountpoint   = Mountpoint,
              is_super     = IsSuper,
@@ -118,12 +126,12 @@ info(#pstate{zone         = Zone,
              connected    = Connected,
              connected_at = ConnectedAt}) ->
     [{zone, Zone},
+     {client_id, ClientId},
+     {username, Username},
      {peername, Peername},
      {proto_ver, ProtoVer},
      {proto_name, ProtoName},
      {conn_props, ConnProps},
-     {client_id, ClientId},
-     {username, Username},
      {clean_start, CleanStart},
      {keepalive, Keepalive},
      {mountpoint, Mountpoint},
@@ -135,6 +143,9 @@ info(#pstate{zone         = Zone,
 caps(#pstate{zone = Zone}) ->
     emqx_mqtt_caps:get_caps(Zone).
 
+client_id(#pstate{client_id = ClientId}) ->
+    ClientId.
+
 credentials(#pstate{zone       = Zone,
                     client_id  = ClientId,
                     username   = Username,
@@ -144,20 +155,6 @@ credentials(#pstate{zone       = Zone,
       username  => Username,
       peername  => Peername}.
 
-client(#pstate{zone       = Zone,
-               client_id  = ClientId,
-               client_pid = ClientPid,
-               peername   = Peername,
-               username   = Username}) ->
-    #client{id       = ClientId,
-            pid      = ClientPid,
-            zone     = Zone,
-            peername = Peername,
-            username = Username}.
-
-client_id(#pstate{client_id = ClientId}) ->
-    ClientId.
-
 stats(#pstate{recv_stats = #{pkt := RecvPkt, msg := RecvMsg},
               send_stats = #{pkt := SendPkt, msg := SendMsg}}) ->
     [{recv_pkt, RecvPkt},
@@ -177,42 +174,77 @@ parser(#pstate{packet_size = Size, proto_ver = Ver}) ->
 
 -spec(received(mqtt_packet(), state())
       -> {ok, state()} | {error, term()} | {error, term(), state()}).
-received(?PACKET(Type), PState = #pstate{connected = false})
-    when Type =/= ?CONNECT ->
+received(?PACKET(Type), PState = #pstate{connected = false}) when Type =/= ?CONNECT ->
     {error, proto_not_connected, PState};
 
 received(?PACKET(?CONNECT), PState = #pstate{connected = true}) ->
-    {error, proto_bad_connect, PState};
+    {error, proto_unexpected_connect, PState};
 
 received(Packet = ?PACKET(Type), PState) ->
     trace(recv, Packet, PState),
     case catch emqx_packet:validate(Packet) of
         true ->
-            process(Packet, inc_stats(recv, Type, PState));
-        {'EXIT', {ReasonCode, _Stacktrace}} when is_integer(ReasonCode) ->
-            deliver({disconnect, ReasonCode}, PState),
-            {error, protocol_error, PState};
+            {Packet1, PState1} = preprocess_properties(Packet, PState),
+            process_packet(Packet1, inc_stats(recv, Type, PState1));
         {'EXIT', {Reason, _Stacktrace}} ->
             deliver({disconnect, ?RC_MALFORMED_PACKET}, PState),
             {error, Reason, PState}
     end.
 
 %%------------------------------------------------------------------------------
-%% Process Packet
+%% Preprocess MQTT Properties
 %%------------------------------------------------------------------------------
 
-process(?CONNECT_PACKET(
-           #mqtt_packet_connect{proto_name  = ProtoName,
-                                proto_ver   = ProtoVer,
-                                is_bridge   = IsBridge,
-                                clean_start = CleanStart,
-                                keepalive   = Keepalive,
-                                properties  = ConnProps,
-                                client_id   = ClientId,
-                                username    = Username,
-                                password    = Password} = Connect), PState) ->
+%% Subscription Identifier
+preprocess_properties(Packet = #mqtt_packet{
+                                  variable = Subscribe = #mqtt_packet_subscribe{
+                                                            properties    = #{'Subscription-Identifier' := SubId},
+                                                            topic_filters = TopicFilters
+                                                           }
+                                 },
+                      PState = #pstate{proto_ver = ?MQTT_PROTO_V5}) ->
+    TopicFilters1 = [{Topic, SubOpts#{subid => SubId}} || {Topic, SubOpts} <- TopicFilters],
+    {Packet#mqtt_packet{variable = Subscribe#mqtt_packet_subscribe{topic_filters = TopicFilters1}}, PState};
+
+%% Topic Alias Mapping
+preprocess_properties(Packet = #mqtt_packet{
+                                  variable = Publish = #mqtt_packet_publish{
+                                                          topic_name = <<>>,
+                                                          properties = #{'Topic-Alias' := AliasId}}
+                                 },
+                      PState = #pstate{proto_ver = ?MQTT_PROTO_V5, topic_aliases = Aliases}) ->
+    {Packet#mqtt_packet{variable = Publish#mqtt_packet_publish{
+                                     topic_name = maps:get(AliasId, Aliases, <<>>)}}, PState};
+
+preprocess_properties(Packet = #mqtt_packet{
+                                  variable = #mqtt_packet_publish{
+                                                topic_name = Topic,
+                                                properties = #{'Topic-Alias' := AliasId}}
+                                 },
+                      PState = #pstate{proto_ver = ?MQTT_PROTO_V5, topic_aliases = Aliases}) ->
+    {Packet, PState#pstate{topic_aliases = maps:put(AliasId, Topic, Aliases)}};
+
+preprocess_properties(Packet, PState) ->
+    {Packet, PState}.
 
-    io:format("~p~n", [Connect]),
+%%------------------------------------------------------------------------------
+%% Process MQTT Packet
+%%------------------------------------------------------------------------------
+
+process_packet(?CONNECT_PACKET(
+                  #mqtt_packet_connect{proto_name  = ProtoName,
+                                       proto_ver   = ProtoVer,
+                                       is_bridge   = IsBridge,
+                                       clean_start = CleanStart,
+                                       keepalive   = Keepalive,
+                                       properties  = ConnProps,
+                                       client_id   = ClientId,
+                                       username    = Username,
+                                       password    = Password} = Connect), PState) ->
+
+    %% TODO: Mountpoint...
+    %% Msg -> emqx_mountpoint:mount(MountPoint, Msg)
+    WillMsg = emqx_packet:will_msg(Connect),
 
     PState1 = set_username(Username,
                            PState#pstate{client_id    = ClientId,
@@ -221,7 +253,7 @@ process(?CONNECT_PACKET(
                                          clean_start  = CleanStart,
                                          keepalive    = Keepalive,
                                          conn_props   = ConnProps,
-                                         will_msg     = willmsg(Connect, PState),
+                                         will_msg     = WillMsg,
                                          is_bridge    = IsBridge,
                                          connected    = true,
                                          connected_at = os:timestamp()}),
@@ -240,10 +272,8 @@ process(?CONNECT_PACKET(
                               ok = emqx_cm:register_connection(client_id(PState4), info(PState4)),
                               %% Start keepalive
                               start_keepalive(Keepalive, PState4),
-                              %% TODO: 'Run hooks' before open_session?
-                              emqx_hooks:run('client.connected', [?RC_SUCCESS], client(PState4)),
                               %% Success
-                              {?RC_SUCCESS, SP, replvar(PState4)};
+                              {?RC_SUCCESS, SP, PState4};
                           {error, Error} ->
                               ?LOG(error, "Failed to open session: ~p", [Error], PState1),
                               {?RC_UNSPECIFIED_ERROR, PState1}
@@ -256,7 +286,7 @@ process(?CONNECT_PACKET(
               {ReasonCode, PState1}
       end);
 
-process(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload), PState) ->
+process_packet(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload), PState) ->
     case check_publish(Packet, PState) of
         {ok, PState1} ->
             do_publish(Packet, PState1);
@@ -265,7 +295,7 @@ process(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload), PState) ->
             {ok, PState}
     end;
 
-process(Packet = ?PUBLISH_PACKET(?QOS_1, PacketId), PState) ->
+process_packet(Packet = ?PUBLISH_PACKET(?QOS_1, PacketId), PState) ->
     case check_publish(Packet, PState) of
         {ok, PState1} ->
             do_publish(Packet, PState1);
@@ -273,7 +303,7 @@ process(Packet = ?PUBLISH_PACKET(?QOS_1, PacketId), PState) ->
             deliver({puback, PacketId, ReasonCode}, PState)
     end;
 
-process(Packet = ?PUBLISH_PACKET(?QOS_2, PacketId), PState) ->
+process_packet(Packet = ?PUBLISH_PACKET(?QOS_2, PacketId), PState) ->
     case check_publish(Packet, PState) of
         {ok, PState1} ->
             do_publish(Packet, PState1);
@@ -281,30 +311,37 @@ process(Packet = ?PUBLISH_PACKET(?QOS_2, PacketId), PState) ->
             deliver({pubrec, PacketId, ReasonCode}, PState)
     end;
 
-process(?PUBACK_PACKET(PacketId, ReasonCode), PState = #pstate{session = SPid}) ->
-    ok = emqx_session:puback(SPid, PacketId, ReasonCode),
-    {ok, PState};
+process_packet(?PUBACK_PACKET(PacketId, ReasonCode), PState = #pstate{session = SPid}) ->
+    {ok = emqx_session:puback(SPid, PacketId, ReasonCode), PState};
 
-process(?PUBREC_PACKET(PacketId, ReasonCode), PState = #pstate{session = SPid}) ->
-    ok = emqx_session:pubrec(SPid, PacketId, ReasonCode),
-    send(?PUBREL_PACKET(PacketId), PState);
+process_packet(?PUBREC_PACKET(PacketId, ReasonCode), PState = #pstate{session = SPid}) ->
+    case emqx_session:pubrec(SPid, PacketId, ReasonCode) of
+        ok ->
+            send(?PUBREL_PACKET(PacketId), PState);
+        {error, NotFound} ->
+            send(?PUBREL_PACKET(PacketId, NotFound), PState)
+    end;
 
-process(?PUBREL_PACKET(PacketId, ReasonCode), PState = #pstate{session = SPid}) ->
-    ok = emqx_session:pubrel(SPid, PacketId, ReasonCode),
-    send(?PUBCOMP_PACKET(PacketId), PState);
+process_packet(?PUBREL_PACKET(PacketId, ReasonCode), PState = #pstate{session = SPid}) ->
+    case emqx_session:pubrel(SPid, PacketId, ReasonCode) of
+        ok ->
+            send(?PUBCOMP_PACKET(PacketId), PState);
+        {error, NotFound} ->
+            send(?PUBCOMP_PACKET(PacketId, NotFound), PState)
+    end;
 
-process(?PUBCOMP_PACKET(PacketId, ReasonCode), PState = #pstate{session = SPid}) ->
-    ok = emqx_session:pubcomp(SPid, PacketId, ReasonCode),
-    {ok, PState};
+process_packet(?PUBCOMP_PACKET(PacketId, ReasonCode), PState = #pstate{session = SPid}) ->
+    {ok = emqx_session:pubcomp(SPid, PacketId, ReasonCode), PState};
 
-process(?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
-        PState = #pstate{client_id = ClientId, session = SPid}) ->
+process_packet(?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
+               PState = #pstate{session = SPid, mountpoint = Mountpoint}) ->
     case check_subscribe(
            parse_topic_filters(?SUBSCRIBE, RawTopicFilters), PState) of
         {ok, TopicFilters} ->
-            case emqx_hooks:run('client.subscribe', [ClientId], TopicFilters) of
+            case emqx_hooks:run('client.subscribe', [credentials(PState)], TopicFilters) of
                 {ok, TopicFilters1} ->
-                    ok = emqx_session:subscribe(SPid, PacketId, Properties, mount(TopicFilters1, PState)),
+                    ok = emqx_session:subscribe(SPid, PacketId, Properties,
+                                                emqx_mountpoint:mount(Mountpoint, TopicFilters1)),
                     {ok, PState};
                 {stop, _} ->
                     ReasonCodes = lists:duplicate(length(TopicFilters),
@@ -320,12 +357,13 @@ process(?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
             deliver({suback, PacketId, ReasonCodes}, PState)
     end;
 
-process(?UNSUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
-        PState = #pstate{client_id = ClientId, session = SPid}) ->
-    case emqx_hooks:run('client.unsubscribe', [ClientId],
+process_packet(?UNSUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
+               PState = #pstate{session = SPid, mountpoint = MountPoint}) ->
+    case emqx_hooks:run('client.unsubscribe', [credentials(PState)],
                         parse_topic_filters(?UNSUBSCRIBE, RawTopicFilters)) of
         {ok, TopicFilters} ->
-            ok = emqx_session:unsubscribe(SPid, PacketId, Properties, mount(TopicFilters, PState)),
+            ok = emqx_session:unsubscribe(SPid, PacketId, Properties,
+                                          emqx_mountpoint:mount(MountPoint, TopicFilters)),
             {ok, PState};
         {stop, _Acc} ->
             ReasonCodes = lists:duplicate(length(RawTopicFilters),
@@ -333,21 +371,23 @@ process(?UNSUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
             deliver({unsuback, PacketId, ReasonCodes}, PState)
     end;
 
-process(?PACKET(?PINGREQ), PState) ->
+process_packet(?PACKET(?PINGREQ), PState) ->
     send(?PACKET(?PINGRESP), PState);
 
-process(?PACKET(?DISCONNECT), PState) ->
+process_packet(?PACKET(?DISCONNECT), PState) ->
     %% Clean willmsg
     {stop, normal, PState#pstate{will_msg = undefined}}.
 
 %%------------------------------------------------------------------------------
-%% ConnAck -> Client
+%% ConnAck --> Client
 %%------------------------------------------------------------------------------
 
 connack({?RC_SUCCESS, SP, PState}) ->
-    deliver({connack, ?RC_SUCCESS, sp(SP)}, PState);
+    emqx_hooks:run('client.connected', [credentials(PState), ?RC_SUCCESS, info(PState)]),
+    deliver({connack, ?RC_SUCCESS, sp(SP)}, update_mountpoint(PState));
 
 connack({ReasonCode, PState = #pstate{proto_ver = ProtoVer}}) ->
+    emqx_hooks:run('client.connected', [credentials(PState), ?RC_SUCCESS, info(PState)]),
     _ = deliver({connack, if ProtoVer =:= ?MQTT_PROTO_V5 ->
                                  ReasonCode;
                              true ->
@@ -360,20 +400,28 @@ connack({ReasonCode, PState = #pstate{proto_ver = ProtoVer}}) ->
 %%------------------------------------------------------------------------------
 
 do_publish(Packet = ?PUBLISH_PACKET(QoS, PacketId),
-           PState = #pstate{client_id = ClientId, session = SPid}) ->
-    Msg = mount(emqx_packet:to_message(ClientId, Packet), PState),
-    _ = emqx_session:publish(SPid, PacketId, Msg),
-    puback(QoS, PacketId, PState).
+           PState = #pstate{session = SPid, mountpoint = MountPoint}) ->
+    Msg = emqx_mountpoint:mount(MountPoint,
+                                emqx_packet:to_message(credentials(PState), Packet)),
+    puback(QoS, PacketId, emqx_session:publish(SPid, PacketId, Msg), PState).
 
 %%------------------------------------------------------------------------------
 %% Puback -> Client
 %%------------------------------------------------------------------------------
 
-puback(?QOS_0, _PacketId, PState) ->
+puback(?QOS_0, _PacketId, _Result, PState) ->
     {ok, PState};
-puback(?QOS_1, PacketId, PState) ->
+puback(?QOS_1, PacketId, {error, ReasonCode}, PState) ->
+    deliver({puback, PacketId, ReasonCode}, PState);
+puback(?QOS_1, PacketId, {ok, []}, PState) ->
+    deliver({puback, PacketId, ?RC_NO_MATCHING_SUBSCRIBERS}, PState);
+puback(?QOS_1, PacketId, {ok, _}, PState) ->
     deliver({puback, PacketId, ?RC_SUCCESS}, PState);
-puback(?QOS_2, PacketId, PState) ->
+puback(?QOS_2, PacketId, {error, ReasonCode}, PState) ->
+    deliver({pubrec, PacketId, ReasonCode}, PState);
+puback(?QOS_2, PacketId, {ok, []}, PState) ->
+    deliver({pubrec, PacketId, ?RC_NO_MATCHING_SUBSCRIBERS}, PState);
+puback(?QOS_2, PacketId, {ok, _}, PState) ->
     deliver({pubrec, PacketId, ?RC_SUCCESS}, PState).
 
 %%------------------------------------------------------------------------------
@@ -386,10 +434,9 @@ deliver({connack, ReasonCode}, PState) ->
 deliver({connack, ReasonCode, SP}, PState) ->
     send(?CONNACK_PACKET(ReasonCode, SP), PState);
 
-deliver({publish, PacketId, Msg}, PState = #pstate{client_id = ClientId,
-                                                        is_bridge = IsBridge}) ->
-    _ = emqx_hooks:run('message.delivered', [ClientId], Msg),
-    Msg1 = unmount(clean_retain(IsBridge, Msg), PState),
+deliver({publish, PacketId, Msg}, PState = #pstate{is_bridge  = IsBridge, mountpoint = MountPoint}) ->
+    _ = emqx_hooks:run('message.delivered', credentials(PState), Msg),
+    Msg1 = emqx_mountpoint:unmount(MountPoint, clean_retain(IsBridge, Msg)),
     send(emqx_packet:from_message(PacketId, Msg1), PState);
 
 deliver({puback, PacketId, ReasonCode}, PState) ->
@@ -448,13 +495,13 @@ maybe_assign_client_id(PState) ->
 
 try_open_session(#pstate{zone        = Zone,
                          client_id   = ClientId,
-                         client_pid  = ClientPid,
+                         conn_pid    = ConnPid,
                          conn_props  = ConnProps,
                          username    = Username,
                          clean_start = CleanStart}) ->
     case emqx_sm:open_session(#{zone        => Zone,
                                 client_id   => ClientId,
-                                client_pid  => ClientPid,
+                                conn_pid    => ConnPid,
                                 username    => Username,
                                 clean_start => CleanStart,
                                 conn_props  => ConnProps}) of
@@ -595,16 +642,9 @@ shutdown(Error, PState = #pstate{client_id = ClientId, will_msg = WillMsg}) ->
         true -> ok;
         false -> send_willmsg(WillMsg)
     end,
-    emqx_hooks:run('client.disconnected', [Error], client(PState)),
+    emqx_hooks:run('client.disconnected', [credentials(PState), Error]),
     emqx_cm:unregister_connection(ClientId).
 
-willmsg(Packet, PState = #pstate{client_id = ClientId})
-    when is_record(Packet, mqtt_packet_connect) ->
-    case emqx_packet:to_message(ClientId, Packet) of
-        undefined -> undefined;
-        Msg -> mount(Msg, PState)
-    end.
-
 send_willmsg(undefined) ->
     ignore;
 send_willmsg(WillMsg) ->
@@ -620,14 +660,11 @@ start_keepalive(Secs, #pstate{zone = Zone}) when Secs > 0 ->
 %% Parse topic filters
 %%-----------------------------------------------------------------------------
 
-parse_topic_filters(?SUBSCRIBE, TopicFilters) ->
-    [begin
-         {Topic, TOpts} = emqx_topic:parse(RawTopic),
-         {Topic, maps:merge(SubOpts, TOpts)}
-     end || {RawTopic, SubOpts} <- TopicFilters];
+parse_topic_filters(?SUBSCRIBE, RawTopicFilters) ->
+    [emqx_topic:parse(RawTopic, SubOpts) || {RawTopic, SubOpts} <- RawTopicFilters];
 
-parse_topic_filters(?UNSUBSCRIBE, TopicFilters) ->
-    lists:map(fun emqx_topic:parse/1, TopicFilters).
+parse_topic_filters(?UNSUBSCRIBE, RawTopicFilters) ->
+    lists:map(fun emqx_topic:parse/1, RawTopicFilters).
 
 %%-----------------------------------------------------------------------------
 %% The retained flag should be propagated for bridge.
@@ -641,37 +678,14 @@ clean_retain(false, Msg = #message{flags = #{retain := true}, headers = Headers}
 clean_retain(_IsBridge, Msg) ->
     Msg.
 
-%%-----------------------------------------------------------------------------
-%% Mount Point
-%%-----------------------------------------------------------------------------
-
-mount(Any, #pstate{mountpoint = undefined}) ->
-    Any;
-mount(Msg = #message{topic = Topic}, #pstate{mountpoint = MountPoint}) ->
-    Msg#message{topic = <<MountPoint/binary, Topic/binary>>};
-mount(TopicFilters, #pstate{mountpoint = MountPoint}) when is_list(TopicFilters) ->
-    [{<<MountPoint/binary, Topic/binary>>, SubOpts} || {Topic, SubOpts} <- TopicFilters].
-
-unmount(Any, #pstate{mountpoint = undefined}) ->
-    Any;
-unmount(Msg = #message{topic = Topic}, #pstate{mountpoint = MountPoint}) ->
-    case catch split_binary(Topic, byte_size(MountPoint)) of
-        {MountPoint, Topic1} -> Msg#message{topic = Topic1};
-        _Other -> Msg
-    end.
+%%------------------------------------------------------------------------------
+%% Update mountpoint
 
-replvar(PState = #pstate{mountpoint = undefined}) ->
+update_mountpoint(PState = #pstate{mountpoint = undefined}) ->
     PState;
-replvar(PState = #pstate{client_id = ClientId, username = Username, mountpoint = MountPoint}) ->
-    Vars = [{<<"%c">>, ClientId}, {<<"%u">>, Username}],
-    PState#pstate{mountpoint = lists:foldl(fun feed_var/2, MountPoint, Vars)}.
-
-feed_var({<<"%c">>, ClientId}, MountPoint) ->
-    emqx_topic:feed_var(<<"%c">>, ClientId, MountPoint);
-feed_var({<<"%u">>, undefined}, MountPoint) ->
-    MountPoint;
-feed_var({<<"%u">>, Username}, MountPoint) ->
-    emqx_topic:feed_var(<<"%u">>, Username, MountPoint).
+update_mountpoint(PState = #pstate{mountpoint = MountPoint}) ->
+    PState#pstate{mountpoint = emqx_mountpoint:replvar(MountPoint, credentials(PState))}.
 
 sp(true)  -> 1;
 sp(false) -> 0.
+

+ 161 - 194
src/emqx_session.erl

@@ -73,11 +73,11 @@
           %% Username
           username :: binary() | undefined,
 
-          %% Client pid binding with session
-          client_pid :: pid(),
+          %% Connection pid binding with session
+          conn_pid :: pid(),
 
-          %% Old client Pid that has been kickout
-          old_client_pid :: pid(),
+          %% Old Connection Pid that has been kickout
+          old_conn_pid :: pid(),
 
           %% Next packet id of the session
           next_pkt_id = 1 :: mqtt_packet_id(),
@@ -133,9 +133,6 @@
           %% Stats timer
           stats_timer  :: reference() | undefined,
 
-          %% Ignore loop deliver?
-          ignore_loop_deliver = false :: boolean(),
-
           %% TODO:
           deliver_stats = 0,
 
@@ -148,7 +145,7 @@
 
 -define(TIMEOUT, 60000).
 
--define(INFO_KEYS, [clean_start, client_id, username, binding, client_pid, old_client_pid,
+-define(INFO_KEYS, [clean_start, client_id, username, binding, conn_pid, old_conn_pid,
                     next_pkt_id, max_subscriptions, subscriptions, upgrade_qos, inflight,
                     max_inflight, retry_interval, mqueue, awaiting_rel, max_awaiting_rel,
                     await_rel_timeout, expiry_interval, enable_stats, created_at]).
@@ -169,20 +166,17 @@ start_link(SessAttrs) ->
 
 -spec(subscribe(pid(), list({topic(), map()}) |
                 {mqtt_packet_id(), mqtt_properties(), topic_table()}) -> ok).
-subscribe(SPid, TopicFilters) when is_list(TopicFilters) ->
-    gen_server:cast(SPid, {subscribe, [begin
-                                           {Topic, Opts} = emqx_topic:parse(RawTopic),
-                                           {Topic, maps:merge(
-                                                     maps:merge(
-                                                       ?DEFAULT_SUBOPTS, SubOpts), Opts)}
-                                       end || {RawTopic, SubOpts} <- TopicFilters]}).
+subscribe(SPid, RawTopicFilters) when is_list(RawTopicFilters) ->
+    TopicFilters = [emqx_topic:parse(RawTopic, maps:merge(?DEFAULT_SUBOPTS, SubOpts))
+                    || {RawTopic, SubOpts} <- RawTopicFilters],
+    subscribe(SPid, undefined, #{}, TopicFilters).
 
 %% for mqtt 5.0
 subscribe(SPid, PacketId, Properties, TopicFilters) ->
     SubReq = {PacketId, Properties, TopicFilters},
     gen_server:cast(SPid, {subscribe, self(), SubReq}).
 
--spec(publish(pid(), mqtt_packet_id(), message()) -> {ok, delivery()} | {error, term()}).
+-spec(publish(pid(), mqtt_packet_id(), message()) -> {ok, emqx_types:dispatches()}).
 publish(_SPid, _PacketId, Msg = #message{qos = ?QOS_0}) ->
     %% Publish QoS0 message to broker directly
     emqx_broker:publish(Msg);
@@ -202,27 +196,29 @@ puback(SPid, PacketId) ->
 puback(SPid, PacketId, ReasonCode) ->
     gen_server:cast(SPid, {puback, PacketId, ReasonCode}).
 
--spec(pubrec(pid(), mqtt_packet_id()) -> ok).
+-spec(pubrec(pid(), mqtt_packet_id()) -> ok | {error, mqtt_reason_code()}).
 pubrec(SPid, PacketId) ->
-    gen_server:cast(SPid, {pubrec, PacketId}).
+    pubrec(SPid, PacketId, ?RC_SUCCESS).
 
+-spec(pubrec(pid(), mqtt_packet_id(), mqtt_reason_code())
+      -> ok | {error, mqtt_reason_code()}).
 pubrec(SPid, PacketId, ReasonCode) ->
-    gen_server:cast(SPid, {pubrec, PacketId, ReasonCode}).
+    gen_server:call(SPid, {pubrec, PacketId, ReasonCode}, infinity).
 
--spec(pubrel(pid(), mqtt_packet_id(), mqtt_reason_code()) -> ok).
+-spec(pubrel(pid(), mqtt_packet_id(), mqtt_reason_code())
+      -> ok | {error, mqtt_reason_code()}).
 pubrel(SPid, PacketId, ReasonCode) ->
-    gen_server:cast(SPid, {pubrel, PacketId, ReasonCode}).
+    gen_server:call(SPid, {pubrel, PacketId, ReasonCode}, infinity).
 
 -spec(pubcomp(pid(), mqtt_packet_id(), mqtt_reason_code()) -> ok).
 pubcomp(SPid, PacketId, ReasonCode) ->
     gen_server:cast(SPid, {pubcomp, PacketId, ReasonCode}).
 
--spec(unsubscribe(pid(), {mqtt_packet_id(), mqtt_properties(), topic_table()}) -> ok).
-unsubscribe(SPid, TopicFilters) when is_list(TopicFilters) ->
-    %%TODO: Parse the topic filters?
-    unsubscribe(SPid, undefined, #{}, TopicFilters).
+-spec(unsubscribe(pid(), topic_table()) -> ok).
+unsubscribe(SPid, RawTopicFilters) when is_list(RawTopicFilters) ->
+    unsubscribe(SPid, undefined, #{}, lists:map(fun emqx_topic:parse/1, RawTopicFilters)).
 
-%% TODO:...
+-spec(unsubscribe(pid(), mqtt_packet_id(), mqtt_properties(), topic_table()) -> ok).
 unsubscribe(SPid, PacketId, Properties, TopicFilters) ->
     UnsubReq = {PacketId, Properties, TopicFilters},
     gen_server:cast(SPid, {unsubscribe, self(), UnsubReq}).
@@ -310,19 +306,18 @@ close(SPid) ->
 
 init(#{zone        := Zone,
        client_id   := ClientId,
-       client_pid  := ClientPid,
-       clean_start := CleanStart,
        username    := Username,
-       %% TODO:
+       conn_pid    := ConnPid,
+       clean_start := CleanStart,
        conn_props  := _ConnProps}) ->
     process_flag(trap_exit, true),
-    true = link(ClientPid),
+    true = link(ConnPid),
     MaxInflight = get_env(Zone, max_inflight),
     State = #state{clean_start       = CleanStart,
-                   binding           = binding(ClientPid),
+                   binding           = binding(ConnPid),
                    client_id         = ClientId,
-                   client_pid        = ClientPid,
                    username          = Username,
+                   conn_pid          = ConnPid,
                    subscriptions     = #{},
                    max_subscriptions = get_env(Zone, max_subscriptions, 0),
                    upgrade_qos       = get_env(Zone, upgrade_qos, false),
@@ -335,12 +330,11 @@ init(#{zone        := Zone,
                    max_awaiting_rel  = get_env(Zone, max_awaiting_rel),
                    expiry_interval   = get_env(Zone, session_expiry_interval),
                    enable_stats      = get_env(Zone, enable_stats, true),
-                   ignore_loop_deliver = get_env(Zone, ignore_loop_deliver, false),
                    deliver_stats      = 0,
                    enqueue_stats      = 0,
                    created_at        = os:timestamp()},
     emqx_sm:register_session(ClientId, info(State)),
-    emqx_hooks:run('session.created', [ClientId]),
+    emqx_hooks:run('session.created', [#{client_id => ClientId}, info(State)]),
     {ok, ensure_stats_timer(State), hibernate}.
 
 init_mqueue(Zone, ClientId) ->
@@ -351,29 +345,53 @@ init_mqueue(Zone, ClientId) ->
 binding(ClientPid) ->
     case node(ClientPid) =:= node() of true -> local; false -> remote end.
 
-handle_call({discard, ClientPid}, _From, State = #state{client_pid = undefined}) ->
-    ?LOG(warning, "Discarded by ~p", [ClientPid], State),
+handle_call({discard, ConnPid}, _From, State = #state{conn_pid = undefined}) ->
+    ?LOG(warning, "Discarded by ~p", [ConnPid], State),
     {stop, {shutdown, discard}, ok, State};
 
-handle_call({discard, ClientPid}, _From, State = #state{client_pid = OldClientPid}) ->
-    ?LOG(warning, " ~p kickout ~p", [ClientPid, OldClientPid], State),
+handle_call({discard, ConnPid}, _From, State = #state{conn_pid = OldConnPid}) ->
+    ?LOG(warning, " ~p kickout ~p", [ConnPid, OldConnPid], State),
     {stop, {shutdown, conflict}, ok, State};
 
+%% PUBLISH:
 handle_call({publish, PacketId, Msg = #message{qos = ?QOS_2}}, _From,
-            State = #state{awaiting_rel      = AwaitingRel,
-                           await_rel_timer   = Timer,
-                           await_rel_timeout = Timeout}) ->
+            State = #state{awaiting_rel = AwaitingRel}) ->
     case is_awaiting_full(State) of
         false ->
-            State1 = case Timer == undefined of
-                         true  -> State#state{await_rel_timer = emqx_misc:start_timer(Timeout, check_awaiting_rel)};
-                         false -> State
-                     end,
-            reply(ok, State1#state{awaiting_rel = maps:put(PacketId, Msg, AwaitingRel)});
+            case maps:is_key(PacketId, AwaitingRel) of
+                true ->
+                    reply({error, ?RC_PACKET_IDENTIFIER_IN_USE}, State);
+                false ->
+                    State1 = State#state{awaiting_rel = maps:put(PacketId, Msg, AwaitingRel)},
+                    reply(emqx_broker:publish(Msg), ensure_await_rel_timer(State1))
+            end;
         true ->
             ?LOG(warning, "Dropped QoS2 Message for too many awaiting_rel: ~p", [Msg], State),
             emqx_metrics:inc('messages/qos2/dropped'),
-            reply({error, dropped}, State)
+            reply({error, ?RC_RECEIVE_MAXIMUM_EXCEEDED}, State)
+    end;
+
+%% PUBREC:
+handle_call({pubrec, PacketId, _ReasonCode}, _From, State = #state{inflight = Inflight}) ->
+    case emqx_inflight:contain(PacketId, Inflight) of
+        true ->
+            reply(ok, acked(pubrec, PacketId, State));
+        false ->
+            ?LOG(warning, "The PUBREC PacketId is not found: ~w", [PacketId], State),
+            emqx_metrics:inc('packets/pubrec/missed'),
+            reply({error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State)
+    end;
+
+%% PUBREL:
+handle_call({pubrel, PacketId, _ReasonCode}, _From,
+            State = #state{awaiting_rel = AwaitingRel}) ->
+    case maps:take(PacketId, AwaitingRel) of
+        {_, AwaitingRel1} ->
+            reply(ok, State#state{awaiting_rel = AwaitingRel1});
+        error ->
+            ?LOG(warning, "Cannot find PUBREL: ~p", [PacketId], State),
+            emqx_metrics:inc('packets/pubrel/missed'),
+            reply({error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State)
     end;
 
 handle_call(info, _From, State) ->
@@ -390,57 +408,38 @@ handle_call(Req, _From, State) ->
     {reply, ignored, State}.
 
 %% SUBSCRIBE:
-handle_cast({subscribe, TopicFilters}, State = #state{client_id = ClientId, subscriptions = Subscriptions}) ->
-    Subscriptions1 = lists:foldl(
-                       fun({Topic, SubOpts}, SubMap) ->
-                               case maps:find(Topic, SubMap) of
-                                   {ok, _OldOpts} ->
-                                       emqx_broker:set_subopts(Topic, {self(), ClientId}, SubOpts),
-                                       emqx_hooks:run('session.subscribed', [ClientId, Topic, SubOpts]),
-                                       ?LOG(warning, "Duplicated subscribe: ~s, subopts: ~p", [Topic, SubOpts], State);
-                                   error ->
-                                       emqx_broker:subscribe(Topic, ClientId, SubOpts),
-                                       emqx_hooks:run('session.subscribed', [ClientId, Topic, SubOpts])
-                               end,
-                               maps:put(Topic, SubOpts, SubMap)
-                       end, Subscriptions, TopicFilters),
-    {noreply, State#state{subscriptions = Subscriptions1}};
-
-handle_cast({subscribe, From, {PacketId, Properties, TopicFilters}},
+handle_cast({subscribe, FromPid, {PacketId, _Properties, TopicFilters}},
             State = #state{client_id = ClientId, subscriptions = Subscriptions}) ->
     {ReasonCodes, Subscriptions1} =
     lists:foldr(fun({Topic, SubOpts = #{qos := QoS}}, {RcAcc, SubMap}) ->
-                    {[QoS|RcAcc],
-                     case maps:find(Topic, SubMap) of
-                         {ok, SubOpts} ->
-                             ?LOG(warning, "Duplicated subscribe: ~s, subopts: ~p", [Topic, SubOpts], State),
-                             SubMap;
-                         {ok, OldOpts} ->
-                             emqx_broker:set_subopts(Topic, {self(), ClientId}, SubOpts),
-                             emqx_hooks:run('session.subscribed', [ClientId, Topic, SubOpts]),
-                             ?LOG(warning, "Duplicated subscribe ~s, old_opts: ~p, new_opts: ~p", [Topic, OldOpts, SubOpts], State),
-                             maps:put(Topic, with_subid(Properties, SubOpts), SubMap);
-                         error ->
-                             emqx_broker:subscribe(Topic, ClientId, SubOpts),
-                             emqx_hooks:run('session.subscribed', [ClientId, Topic, SubOpts]),
-                             maps:put(Topic, with_subid(Properties, SubOpts), SubMap)
-                     end}
+                    {[QoS|RcAcc], case maps:find(Topic, SubMap) of
+                                      {ok, SubOpts} ->
+                                          SubMap;
+                                      {ok, _SubOpts} ->
+                                          emqx_broker:set_subopts(Topic, {self(), ClientId}, SubOpts),
+                                          emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts]),
+                                          maps:put(Topic, SubOpts, SubMap);
+                                      error ->
+                                          emqx_broker:subscribe(Topic, ClientId, SubOpts),
+                                          emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts]),
+                                          maps:put(Topic, SubOpts, SubMap)
+                                  end}
                 end, {[], Subscriptions}, TopicFilters),
-    suback(From, PacketId, ReasonCodes),
+    suback(FromPid, PacketId, ReasonCodes),
     {noreply, State#state{subscriptions = Subscriptions1}};
 
 %% UNSUBSCRIBE:
 handle_cast({unsubscribe, From, {PacketId, _Properties, TopicFilters}},
             State = #state{client_id = ClientId, subscriptions = Subscriptions}) ->
     {ReasonCodes, Subscriptions1} =
-    lists:foldr(fun({Topic, _Opts}, {RcAcc, SubMap}) ->
+    lists:foldr(fun({Topic, _SubOpts}, {Acc, SubMap}) ->
                         case maps:find(Topic, SubMap) of
                             {ok, SubOpts} ->
-                                emqx_broker:unsubscribe(Topic, ClientId),
-                                emqx_hooks:run('session.unsubscribed', [ClientId, Topic, SubOpts]),
-                                {[?RC_SUCCESS|RcAcc], maps:remove(Topic, SubMap)};
+                                ok = emqx_broker:unsubscribe(Topic, ClientId),
+                                emqx_hooks:run('session.unsubscribed', [#{client_id => ClientId}, Topic, SubOpts]),
+                                {[?RC_SUCCESS|Acc], maps:remove(Topic, SubMap)};
                             error ->
-                                {[?RC_NO_SUBSCRIPTION_EXISTED|RcAcc], SubMap}
+                                {[?RC_NO_SUBSCRIPTION_EXISTED|Acc], SubMap}
                         end
                 end, {[], Subscriptions}, TopicFilters),
     unsuback(From, PacketId, ReasonCodes),
@@ -452,73 +451,45 @@ handle_cast({puback, PacketId, _ReasonCode}, State = #state{inflight = Inflight}
         true ->
             {noreply, dequeue(acked(puback, PacketId, State))};
         false ->
-            ?LOG(warning, "The PUBACK PacketId is not found: ~p", [PacketId], State),
+            ?LOG(warning, "The PUBACK PacketId is not found: ~w", [PacketId], State),
             emqx_metrics:inc('packets/puback/missed'),
             {noreply, State}
     end;
 
-%% PUBREC: How to handle ReasonCode?
-handle_cast({pubrec, PacketId, _ReasonCode}, State = #state{inflight = Inflight}) ->
-    case emqx_inflight:contain(PacketId, Inflight) of
-        true ->
-            {noreply, acked(pubrec, PacketId, State)};
-        false ->
-            ?LOG(warning, "The PUBREC PacketId is not found: ~w", [PacketId], State),
-            emqx_metrics:inc('packets/pubrec/missed'),
-            {noreply, State}
-    end;
-
-%% PUBREL:
-handle_cast({pubrel, PacketId, _ReasonCode}, State = #state{awaiting_rel = AwaitingRel}) ->
-    {noreply,
-     case maps:take(PacketId, AwaitingRel) of
-         {Msg, AwaitingRel1} ->
-             %% Implement Qos2 by method A [MQTT 4.33]
-             %% Dispatch to subscriber when received PUBREL
-             emqx_broker:publish(Msg), %% FIXME:
-             maybe_gc(State#state{awaiting_rel = AwaitingRel1});
-         error ->
-             ?LOG(warning, "Cannot find PUBREL: ~p", [PacketId], State),
-             emqx_metrics:inc('packets/pubrel/missed'),
-             State
-     end, hibernate};
-
 %% PUBCOMP:
 handle_cast({pubcomp, PacketId, _ReasonCode}, State = #state{inflight = Inflight}) ->
     case emqx_inflight:contain(PacketId, Inflight) of
         true ->
             {noreply, dequeue(acked(pubcomp, PacketId, State))};
         false ->
-            ?LOG(warning, "The PUBCOMP Packet Identifier is not found: ~w", [PacketId], State),
+            ?LOG(warning, "The PUBCOMP PacketId is not found: ~w", [PacketId], State),
             emqx_metrics:inc('packets/pubcomp/missed'),
             {noreply, State}
     end;
 
 %% RESUME:
-handle_cast({resume, ClientPid},
-            State = #state{client_id       = ClientId,
-                           client_pid      = OldClientPid,
-                           clean_start     = CleanStart,
-                           retry_timer     = RetryTimer,
-                           await_rel_timer = AwaitTimer,
-                           expiry_timer    = ExpireTimer}) ->
+handle_cast({resume, ConnPid}, State = #state{client_id       = ClientId,
+                                              conn_pid        = OldConnPid,
+                                              clean_start     = CleanStart,
+                                              retry_timer     = RetryTimer,
+                                              await_rel_timer = AwaitTimer,
+                                              expiry_timer    = ExpireTimer}) ->
 
-    ?LOG(info, "Resumed by ~p ", [ClientPid], State),
+    ?LOG(info, "Resumed by connection ~p ", [ConnPid], State),
 
     %% Cancel Timers
-    lists:foreach(fun emqx_misc:cancel_timer/1,
-                  [RetryTimer, AwaitTimer, ExpireTimer]),
+    lists:foreach(fun emqx_misc:cancel_timer/1, [RetryTimer, AwaitTimer, ExpireTimer]),
 
-    case kick(ClientId, OldClientPid, ClientPid) of
-        ok -> ?LOG(warning, "~p kickout ~p", [ClientPid, OldClientPid], State);
+    case kick(ClientId, OldConnPid, ConnPid) of
+        ok -> ?LOG(warning, "connection ~p kickout ~p", [ConnPid, OldConnPid], State);
         ignore -> ok
     end,
 
-    true = link(ClientPid),
+    true = link(ConnPid),
 
-    State1 = State#state{client_pid      = ClientPid,
-                         binding         = binding(ClientPid),
-                         old_client_pid  = OldClientPid,
+    State1 = State#state{conn_pid        = ConnPid,
+                         binding         = binding(ConnPid),
+                         old_conn_pid    = OldConnPid,
                          clean_start     = false,
                          retry_timer     = undefined,
                          awaiting_rel    = #{},
@@ -526,14 +497,9 @@ handle_cast({resume, ClientPid},
                          expiry_timer    = undefined},
 
     %% Clean Session: true -> false?
-    if
-        CleanStart =:= true ->
-            ?LOG(error, "CleanSess changed to false.", [], State1);
-            %%TODO::
-            %%emqx_sm:register_session(ClientId, info(State1));
-        CleanStart =:= false ->
-            ok
-    end,
+    CleanStart andalso emqx_sm:set_session_attrs(ClientId, info(State1)),
+
+    emqx_hooks:run('session.resumed', [#{client_id => ClientId}, info(State)]),
 
     %% Replay delivery and Dequeue pending messages
     {noreply, ensure_stats_timer(dequeue(retry_delivery(true, State1)))};
@@ -542,22 +508,25 @@ handle_cast(Msg, State) ->
     emqx_logger:error("[Session] unexpected cast: ~p", [Msg]),
     {noreply, State}.
 
+%% Batch dispatch
 handle_info({dispatch, Topic, Msgs}, State) when is_list(Msgs) ->
     {noreply, lists:foldl(fun(Msg, NewState) ->
                               element(2, handle_info({dispatch, Topic, Msg}, NewState))
                           end, State, Msgs)};
 
-%% Ignore messages delivered by self
-handle_info({dispatch, _Topic, #message{from = ClientId}},
-             State = #state{client_id = ClientId, ignore_loop_deliver = true}) ->
-    {noreply, State};
-
-%% Dispatch Message
-handle_info({dispatch, Topic, Msg}, State) when is_record(Msg, message) ->
-    {noreply, maybe_gc(dispatch(tune_qos(Topic, reset_dup(Msg), State), State))};
+%% Dispatch message
+handle_info({dispatch, Topic, Msg}, State = #state{subscriptions = SubMap}) when is_record(Msg, message) ->
+    {noreply, case maps:find(Topic, SubMap) of
+                  {ok, #{nl := Nl, qos := QoS, subid := SubId}} ->
+                      run_dispatch_steps([{nl, Nl},{qos, QoS}, {subid, SubId}], Msg, State);
+                  {ok, #{nl := Nl, qos := QoS}} ->
+                      run_dispatch_steps([{nl, Nl},{qos, QoS}], Msg, State);
+                  error ->
+                      dispatch(reset_dup(Msg), State)
+              end};
 
 %% Do nothing if the client has been disconnected.
-handle_info({timeout, _Timer, retry_delivery}, State = #state{client_pid = undefined}) ->
+handle_info({timeout, _Timer, retry_delivery}, State = #state{conn_pid = undefined}) ->
     {noreply, ensure_stats_timer(State#state{retry_timer = undefined})};
 
 handle_info({timeout, _Timer, retry_delivery}, State) ->
@@ -570,27 +539,25 @@ handle_info({timeout, _Timer, expired}, State) ->
     ?LOG(info, "Expired, shutdown now.", [], State),
     shutdown(expired, State);
 
-handle_info({'EXIT', ClientPid, _Reason},
-            State = #state{clean_start= true, client_pid = ClientPid}) ->
+handle_info({'EXIT', ConnPid, _Reason}, State = #state{clean_start= true, conn_pid = ConnPid}) ->
     {stop, normal, State};
 
-handle_info({'EXIT', ClientPid, Reason},
-            State = #state{clean_start     = false,
-                           client_pid      = ClientPid,
-                           expiry_interval = Interval}) ->
-    ?LOG(info, "Client ~p EXIT for ~p", [ClientPid, Reason], State),
+handle_info({'EXIT', ConnPid, Reason}, State = #state{clean_start = false,
+                                                      conn_pid    = ConnPid,
+                                                      expiry_interval = Interval}) ->
+    ?LOG(info, "Connection ~p EXIT for ~p", [ConnPid, Reason], State),
     ExpireTimer = emqx_misc:start_timer(Interval, expired),
-    State1 = State#state{client_pid = undefined, expiry_timer = ExpireTimer},
-    {noreply, State1, hibernate};
+    State1 = State#state{conn_pid = undefined, expiry_timer = ExpireTimer},
+    {noreply, State1};
 
-handle_info({'EXIT', Pid, _Reason}, State = #state{old_client_pid = Pid}) ->
+handle_info({'EXIT', Pid, _Reason}, State = #state{old_conn_pid = Pid}) ->
     %% ignore
     {noreply, State, hibernate};
 
-handle_info({'EXIT', Pid, Reason}, State = #state{client_pid = ClientPid}) ->
-    ?LOG(error, "unexpected EXIT: client_pid=~p, exit_pid=~p, reason=~p",
-         [ClientPid, Pid, Reason], State),
-    {noreply, State, hibernate};
+handle_info({'EXIT', Pid, Reason}, State = #state{conn_pid = ConnPid}) ->
+    ?LOG(error, "unexpected EXIT: conn_pid=~p, exit_pid=~p, reason=~p",
+         [ConnPid, Pid, Reason], State),
+    {noreply, State};
 
 handle_info(emit_stats, State = #state{client_id = ClientId}) ->
     emqx_sm:set_session_stats(ClientId, stats(State)),
@@ -600,8 +567,8 @@ handle_info(Info, State) ->
     emqx_logger:error("[Session] unexpected info: ~p", [Info]),
     {noreply, State}.
 
-terminate(Reason, #state{client_id = ClientId, username = Username}) ->
-    emqx_hooks:run('session.terminated', [ClientId, Username, Reason]),
+terminate(Reason, #state{client_id = ClientId}) ->
+    emqx_hooks:run('session.terminated', [#{client_id => ClientId}, Reason]),
     emqx_sm:unregister_session(ClientId).
 
 code_change(_OldVsn, State, _Extra) ->
@@ -611,10 +578,6 @@ code_change(_OldVsn, State, _Extra) ->
 %% Internal functions
 %%------------------------------------------------------------------------------
 
-with_subid(#{'Subscription-Identifier' := SubId}, Opts) ->
-    maps:put(subid, SubId, Opts);
-with_subid(_Props, Opts) -> Opts.
-
 suback(_From, undefined, _ReasonCodes) ->
     ignore;
 suback(From, PacketId, ReasonCodes) ->
@@ -726,9 +689,22 @@ is_awaiting_full(#state{awaiting_rel = AwaitingRel, max_awaiting_rel = MaxLen})
 %% Dispatch Messages
 %%------------------------------------------------------------------------------
 
+run_dispatch_steps([], Msg, State) ->
+    dispatch(Msg, State);
+run_dispatch_steps([{nl, 1}|_Steps], #message{from = ClientId}, State = #state{client_id = ClientId}) ->
+    State;
+run_dispatch_steps([{nl, 0}|Steps], Msg, State) ->
+    run_dispatch_steps(Steps, Msg, State);
+run_dispatch_steps([{qos, SubQoS}|Steps], Msg = #message{qos = PubQoS}, State = #state{upgrade_qos = false}) ->
+    run_dispatch_steps(Steps, Msg#message{qos = min(SubQoS, PubQoS)}, State);
+run_dispatch_steps([{qos, SubQoS}|Steps], Msg = #message{qos = PubQoS}, State = #state{upgrade_qos = true}) ->
+    run_dispatch_steps(Steps, Msg#message{qos = max(SubQoS, PubQoS)}, State);
+run_dispatch_steps([{subid, SubId}|Steps], Msg, State) ->
+    run_dispatch_steps(Steps, emqx_message:set_header('Subscription-Identifier', SubId, Msg), State).
+
 %% Enqueue message if the client has been disconnected
-dispatch(Msg, State = #state{client_id = ClientId, client_pid = undefined}) ->
-    case emqx_hooks:run('message.dropped', [ClientId, Msg]) of
+dispatch(Msg, State = #state{client_id = ClientId, conn_pid = undefined}) ->
+    case emqx_hooks:run('message.dropped', [#{client_id => ClientId}, Msg]) of
         ok   -> enqueue_msg(Msg, State);
         stop -> State
     end;
@@ -761,12 +737,12 @@ redeliver({PacketId, Msg = #message{qos = QoS}}, State) ->
                          true -> emqx_message:set_flag(dup, Msg)
                       end, State);
 
-redeliver({pubrel, PacketId}, #state{client_pid = Pid}) ->
-    Pid ! {deliver, {pubrel, PacketId}}.
+redeliver({pubrel, PacketId}, #state{conn_pid = ConnPid}) ->
+    ConnPid ! {deliver, {pubrel, PacketId}}.
 
-deliver(PacketId, Msg, #state{client_pid = Pid, binding = local}) ->
+deliver(PacketId, Msg, #state{conn_pid = Pid, binding = local}) ->
     Pid ! {deliver, {publish, PacketId, Msg}};
-deliver(PacketId, Msg, #state{client_pid = Pid, binding = remote}) ->
+deliver(PacketId, Msg, #state{conn_pid = Pid, binding = remote}) ->
     emqx_rpc:cast(node(Pid), erlang, send, [Pid, {deliver, PacketId, Msg}]).
 
 %%------------------------------------------------------------------------------
@@ -783,24 +759,20 @@ await(PacketId, Msg, State = #state{inflight       = Inflight,
              end,
     State1#state{inflight = emqx_inflight:insert(PacketId, {publish, {PacketId, Msg}, os:timestamp()}, Inflight)}.
 
-acked(puback, PacketId, State = #state{client_id = ClientId,
-                                       username  = Username,
-                                       inflight  = Inflight}) ->
+acked(puback, PacketId, State = #state{client_id = ClientId, inflight  = Inflight}) ->
     case emqx_inflight:lookup(PacketId, Inflight) of
         {value, {publish, Msg, _Ts}} ->
-            emqx_hooks:run('message.acked', [ClientId, Username], Msg),
+            emqx_hooks:run('message.acked', [#{client_id =>ClientId}], Msg),
             State#state{inflight = emqx_inflight:delete(PacketId, Inflight)};
         none ->
             ?LOG(warning, "Duplicated PUBACK Packet: ~p", [PacketId], State),
             State
     end;
 
-acked(pubrec, PacketId, State = #state{client_id = ClientId,
-                                       username  = Username,
-                                       inflight  = Inflight}) ->
+acked(pubrec, PacketId, State = #state{client_id = ClientId, inflight  = Inflight}) ->
     case emqx_inflight:lookup(PacketId, Inflight) of
         {value, {publish, Msg, _Ts}} ->
-            emqx_hooks:run('message.acked', [ClientId, Username], Msg),
+            emqx_hooks:run('message.acked', [ClientId], Msg),
             State#state{inflight = emqx_inflight:update(PacketId, {pubrel, PacketId, os:timestamp()}, Inflight)};
         {value, {pubrel, PacketId, _Ts}} ->
             ?LOG(warning, "Duplicated PUBREC Packet: ~p", [PacketId], State),
@@ -818,7 +790,7 @@ acked(pubcomp, PacketId, State = #state{inflight = Inflight}) ->
 %%------------------------------------------------------------------------------
 
 %% Do nothing if client is disconnected
-dequeue(State = #state{client_pid = undefined}) ->
+dequeue(State = #state{conn_pid = undefined}) ->
     State;
 
 dequeue(State = #state{inflight = Inflight}) ->
@@ -836,19 +808,14 @@ dequeue2(State = #state{mqueue = Q}) ->
             dequeue(dispatch(Msg, State#state{mqueue = Q1}))
     end.
 
+
 %%------------------------------------------------------------------------------
-%% Tune QoS
-
-tune_qos(Topic, Msg = #message{qos = PubQoS},
-         #state{subscriptions = SubMap, upgrade_qos = UpgradeQoS}) ->
-    case maps:find(Topic, SubMap) of
-        {ok, #{qos := SubQoS}} when UpgradeQoS andalso (SubQoS > PubQoS) ->
-            Msg#message{qos = SubQoS};
-        {ok, #{qos := SubQoS}} when (not UpgradeQoS) andalso (SubQoS < PubQoS) ->
-            Msg#message{qos = SubQoS};
-        {ok, _} -> Msg;
-        error   -> Msg
-    end.
+%% Ensure timers
+
+ensure_await_rel_timer(State = #state{await_rel_timer = undefined, await_rel_timeout = Timeout}) ->
+     State#state{await_rel_timer = emqx_misc:start_timer(Timeout, check_awaiting_rel)};
+ensure_await_rel_timer(State) ->
+    State.
 
 %%------------------------------------------------------------------------------
 %% Reset Dup
@@ -888,5 +855,5 @@ reply(Reply, State) ->
 shutdown(Reason, State) ->
     {stop, {shutdown, Reason}, State}.
 
-maybe_gc(State) -> State.
+%%TODO: maybe_gc(State) -> State.
 

+ 23 - 29
src/emqx_sm.erl

@@ -35,8 +35,6 @@
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
          code_change/3]).
 
--record(state, {session_pmon}).
-
 -define(SM, ?MODULE).
 
 %% ETS Tables
@@ -45,26 +43,22 @@
 -define(SESSION_ATTRS_TAB, emqx_session_attrs).
 -define(SESSION_STATS_TAB, emqx_session_stats).
 
--spec(start_link() -> {ok, pid()} | ignore | {error, term()}).
+-spec(start_link() -> emqx_types:startlink_ret()).
 start_link() ->
     gen_server:start_link({local, ?SM}, ?MODULE, [], []).
 
 %% @doc Open a session.
 -spec(open_session(map()) -> {ok, pid()} | {ok, pid(), boolean()} | {error, term()}).
-open_session(Attrs = #{clean_start := true,
-                       client_id   := ClientId,
-                       client_pid  := ClientPid}) ->
+open_session(Attrs = #{clean_start := true, client_id := ClientId, conn_pid := ConnPid}) ->
     CleanStart = fun(_) ->
-                     ok = discard_session(ClientId, ClientPid),
+                     ok = discard_session(ClientId, ConnPid),
                      emqx_session_sup:start_session(Attrs)
                  end,
     emqx_sm_locker:trans(ClientId, CleanStart);
 
-open_session(Attrs = #{clean_start := false,
-                       client_id   := ClientId,
-                       client_pid  := ClientPid}) ->
+open_session(Attrs = #{clean_start := false, client_id := ClientId, conn_pid := ConnPid}) ->
     ResumeStart = fun(_) ->
-                      case resume_session(ClientId, ClientPid) of
+                      case resume_session(ClientId, ConnPid) of
                           {ok, SPid} ->
                               {ok, SPid, true};
                           {error, not_found} ->
@@ -80,34 +74,33 @@ open_session(Attrs = #{clean_start := false,
 discard_session(ClientId) when is_binary(ClientId) ->
     discard_session(ClientId, self()).
 
-discard_session(ClientId, ClientPid) when is_binary(ClientId) ->
-    lists:foreach(
-      fun({_ClientId, SPid}) ->
-          case catch emqx_session:discard(SPid, ClientPid) of
-              {Err, Reason} when Err =:= 'EXIT'; Err =:= error ->
-                  emqx_logger:error("[SM] Failed to discard ~p: ~p", [SPid, Reason]);
-              ok -> ok
-          end
-      end, lookup_session(ClientId)).
+discard_session(ClientId, ConnPid) when is_binary(ClientId) ->
+    lists:foreach(fun({_ClientId, SPid}) ->
+                      case catch emqx_session:discard(SPid, ConnPid) of
+                          {Err, Reason} when Err =:= 'EXIT'; Err =:= error ->
+                              emqx_logger:error("[SM] Failed to discard ~p: ~p", [SPid, Reason]);
+                          ok -> ok
+                      end
+                  end, lookup_session(ClientId)).
 
 %% @doc Try to resume a session.
 -spec(resume_session(client_id()) -> {ok, pid()} | {error, term()}).
 resume_session(ClientId) ->
     resume_session(ClientId, self()).
 
-resume_session(ClientId, ClientPid) ->
+resume_session(ClientId, ConnPid) ->
     case lookup_session(ClientId) of
         [] -> {error, not_found};
         [{_ClientId, SPid}] ->
-            ok = emqx_session:resume(SPid, ClientPid),
+            ok = emqx_session:resume(SPid, ConnPid),
             {ok, SPid};
         Sessions ->
             [{_, SPid}|StaleSessions] = lists:reverse(Sessions),
             emqx_logger:error("[SM] More than one session found: ~p", [Sessions]),
             lists:foreach(fun({_, StalePid}) ->
-                              catch emqx_session:discard(StalePid, ClientPid)
+                              catch emqx_session:discard(StalePid, ConnPid)
                           end, StaleSessions),
-            ok = emqx_session:resume(SPid, ClientPid),
+            ok = emqx_session:resume(SPid, ConnPid),
             {ok, SPid}
     end.
 
@@ -224,11 +217,11 @@ handle_call(Req, _From, State) ->
     emqx_logger:error("[SM] unexpected call: ~p", [Req]),
     {reply, ignored, State}.
 
-handle_cast({notify, {registered, ClientId, SPid}}, State = #state{session_pmon = PMon}) ->
-    {noreply, State#state{session_pmon = emqx_pmon:monitor(SPid, ClientId, PMon)}};
+handle_cast({notify, {registered, ClientId, SPid}}, State = #{session_pmon := PMon}) ->
+    {noreply, State#{session_pmon := emqx_pmon:monitor(SPid, ClientId, PMon)}};
 
-handle_cast({notify, {unregistered, _ClientId, SPid}}, State = #state{session_pmon = PMon}) ->
-    {noreply, State#state{session_pmon = emqx_pmon:demonitor(SPid, PMon)}};
+handle_cast({notify, {unregistered, _ClientId, SPid}}, State = #{session_pmon := PMon}) ->
+    {noreply, State#{session_pmon := emqx_pmon:demonitor(SPid, PMon)}};
 
 handle_cast(Msg, State) ->
     emqx_logger:error("[SM] unexpected cast: ~p", [Msg]),
@@ -236,7 +229,8 @@ handle_cast(Msg, State) ->
 
 handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State = #{session_pmon := PMon}) ->
     case emqx_pmon:find(DownPid, PMon) of
-        undefined -> {noreply, State};
+        undefined ->
+            {noreply, State};
         ClientId  ->
             unregister_session({ClientId, DownPid}),
             {noreply, State#{session_pmon := emqx_pmon:erase(DownPid, PMon)}}

+ 4 - 1
src/emqx_types.erl

@@ -19,7 +19,7 @@
 -export_type([startlink_ret/0]).
 -export_type([zone/0, client_id/0, username/0, password/0, peername/0,
               protocol/0, credentials/0]).
--export_type([payload/0]).
+-export_type([topic/0, payload/0, dispatches/0]).
 %%-export_type([payload/0, message/0, delivery/0]).
 
 -type(startlink_ret() :: {ok, pid()} | ignore | {error, term()}).
@@ -36,7 +36,10 @@
                          zone      => zone(),
                          atom()    => term()}).
 
+-type(topic() :: binary()).
 -type(payload() :: binary() | iodata()).
 %-type(message() :: #message{}).
 %-type(delivery() :: #delivery{}).
 
+-type(dispatches() :: [{route, node(), topic()} | {dispatch, topic(), pos_integer()}]).
+