Prechádzať zdrojové kódy

Introduce the 'pipeline' design pattern

- Introduce the 'pipeline' design pattern in emqx_protocol module
- Reame the '{dispatch, ...' to '{deliver, ...' in emqx_broker module
- Rename type 'credentials' to 'client'
Feng Lee 6 rokov pred
rodič
commit
0a6468cf48

+ 2 - 2
src/emqx_broker.erl

@@ -274,13 +274,13 @@ dispatch(Topic, Delivery = #delivery{message = Msg, results = Results}) ->
             Delivery;
         [Sub] -> %% optimize?
             Cnt = dispatch(Sub, Topic, Msg),
-            Delivery#delivery{results = [{dispatch, Topic, Cnt}|Results]};
+            Delivery#delivery{results = [{deliver, Topic, Cnt}|Results]};
         Subs ->
             Cnt = lists:foldl(
                     fun(Sub, Acc) ->
                             dispatch(Sub, Topic, Msg) + Acc
                     end, 0, Subs),
-            Delivery#delivery{results = [{dispatch, Topic, Cnt}|Results]}
+            Delivery#delivery{results = [{deliver, Topic, Cnt}|Results]}
     end.
 
 dispatch(SubPid, Topic, Msg) when is_pid(SubPid) ->

+ 1 - 1
src/emqx_client.erl

@@ -205,7 +205,7 @@
                 | {nl,  boolean()}
                 | {qos, qos()}).
 
--type(reason_code() :: emqx_mqtt_types:reason_code()).
+-type(reason_code() :: emqx_types:reason_code()).
 
 -type(subscribe_ret() :: {ok, properties(), [reason_code()]} | {error, term()}).
 

+ 7 - 9
src/emqx_cm.erl

@@ -42,7 +42,7 @@
         , set_chan_stats/2
         ]).
 
--export([ open_session/1
+-export([ open_session/3
         , discard_session/1
         , resume_session/1
         ]).
@@ -168,24 +168,22 @@ set_chan_stats(ClientId, ChanPid, Stats) ->
     ok.
 
 %% @doc Open a session.
--spec(open_session(map()) -> {ok, emqx_session:session()}
-                           | {error, Reason :: term()}).
-open_session(Attrs = #{clean_start := true,
-                       client_id := ClientId}) ->
+-spec(open_session(boolean(), emqx_types:client(), map())
+      -> {ok, emqx_session:session()} | {error, Reason :: term()}).
+open_session(true, Client = #{client_id := ClientId}, Options) ->
     CleanStart = fun(_) ->
                      ok = discard_session(ClientId),
-                     {ok, emqx_session:init(Attrs), false}
+                     {ok, emqx_session:init(true, Client, Options), false}
                  end,
     emqx_cm_locker:trans(ClientId, CleanStart);
 
-open_session(Attrs = #{clean_start := false,
-                       client_id := ClientId}) ->
+open_session(false, Client = #{client_id := ClientId}, Options) ->
     ResumeStart = fun(_) ->
                       case resume_session(ClientId) of
                           {ok, Session} ->
                               {ok, Session, true};
                           {error, not_found} ->
-                              {ok, emqx_session:init(Attrs), false}
+                              {ok, emqx_session:init(false, Client, Options), false}
                       end
                   end,
     emqx_cm_locker:trans(ClientId, ResumeStart).

+ 1 - 1
src/emqx_mod_acl_internal.erl

@@ -61,7 +61,7 @@ all_rules() ->
 %%--------------------------------------------------------------------
 
 %% @doc Check ACL
--spec(check_acl(emqx_types:credentials(), emqx_types:pubsub(), emqx_topic:topic(),
+-spec(check_acl(emqx_types:client(), emqx_types:pubsub(), emqx_topic:topic(),
                 emqx_access_rule:acl_result(), acl_rules())
       -> {ok, allow} | {ok, deny} | ok).
 check_acl(Client, PubSub, Topic, _AclResult, Rules) ->

+ 7 - 6
src/emqx_mod_rewrite.erl

@@ -32,9 +32,9 @@
         , unload/1
         ]).
 
-%%------------------------------------------------------------------------------
+%%--------------------------------------------------------------------
 %% Load/Unload
-%%------------------------------------------------------------------------------
+%%--------------------------------------------------------------------
 
 load(RawRules) ->
     Rules = compile(RawRules),
@@ -42,10 +42,10 @@ load(RawRules) ->
     emqx_hooks:add('client.unsubscribe', fun ?MODULE:rewrite_unsubscribe/3, [Rules]),
     emqx_hooks:add('message.publish',    fun ?MODULE:rewrite_publish/2, [Rules]).
 
-rewrite_subscribe(_Credentials, TopicTable, Rules) ->
+rewrite_subscribe(_Client, TopicTable, Rules) ->
     {ok, [{match_rule(Topic, Rules), Opts} || {Topic, Opts} <- TopicTable]}.
 
-rewrite_unsubscribe(_Credentials, TopicTable, Rules) ->
+rewrite_unsubscribe(_Client, TopicTable, Rules) ->
     {ok, [{match_rule(Topic, Rules), Opts} || {Topic, Opts} <- TopicTable]}.
 
 rewrite_publish(Message = #message{topic = Topic}, Rules) ->
@@ -56,9 +56,9 @@ unload(_) ->
     emqx_hooks:del('client.unsubscribe', fun ?MODULE:rewrite_unsubscribe/3),
     emqx_hooks:del('message.publish',    fun ?MODULE:rewrite_publish/2).
 
-%%------------------------------------------------------------------------------
+%%--------------------------------------------------------------------
 %% Internal functions
-%%------------------------------------------------------------------------------
+%%--------------------------------------------------------------------
 
 match_rule(Topic, []) ->
     Topic;
@@ -86,3 +86,4 @@ compile(Rules) ->
                   {ok, MP} = re:compile(Re),
                   {rewrite, Topic, MP, Dest}
               end, Rules).
+

+ 14 - 8
src/emqx_mqtt_caps.erl

@@ -14,7 +14,7 @@
 %% limitations under the License.
 %%--------------------------------------------------------------------
 
-%% @doc MQTTv5 capabilities
+%% @doc MQTTv5 Capabilities
 -module(emqx_mqtt_caps).
 
 -include("emqx.hrl").
@@ -28,14 +28,17 @@
 
 -export([default_caps/0]).
 
+-export_type([caps/0]).
+
 -type(caps() :: #{max_packet_size  => integer(),
                   max_clientid_len => integer(),
                   max_topic_alias  => integer(),
                   max_topic_levels => integer(),
                   max_qos_allowed  => emqx_types:qos(),
-                  mqtt_retain_available      => boolean(),
-                  mqtt_shared_subscription   => boolean(),
-                  mqtt_wildcard_subscription => boolean()}).
+                  mqtt_retain_available => boolean(),
+                  mqtt_shared_subscription  => boolean(),
+                  mqtt_wildcard_subscription => boolean()
+                 }).
 
 -define(UNLIMITED, 0).
 
@@ -44,18 +47,21 @@
                        {max_topic_alias,  ?UNLIMITED},
                        {max_topic_levels, ?UNLIMITED},
                        {max_qos_allowed,  ?QOS_2},
-                       {mqtt_retain_available,      true},
-                       {mqtt_shared_subscription,   true},
-                       {mqtt_wildcard_subscription, true}]).
+                       {mqtt_retain_available, true},
+                       {mqtt_shared_subscription, true},
+                       {mqtt_wildcard_subscription, true}
+                      ]).
 
 -define(PUBCAP_KEYS, [max_qos_allowed,
                       mqtt_retain_available,
                       max_topic_alias
                      ]).
+
 -define(SUBCAP_KEYS, [max_qos_allowed,
                       max_topic_levels,
                       mqtt_shared_subscription,
-                      mqtt_wildcard_subscription]).
+                      mqtt_wildcard_subscription
+                     ]).
 
 -spec(check_pub(emqx_types:zone(), map()) -> ok | {error, emqx_types:reason_code()}).
 check_pub(Zone, Props) when is_map(Props) ->

+ 1 - 1
src/emqx_packet.erl

@@ -142,7 +142,7 @@ publish_props(Headers) ->
                'Message-Expiry-Interval'], Headers).
 
 %% @doc Message from Packet
--spec(to_message(emqx_types:credentials(), emqx_ypes:packet())
+-spec(to_message(emqx_types:client(), emqx_ypes:packet())
       -> emqx_types:message()).
 to_message(#{client_id := ClientId, username := Username, peername := Peername},
            #mqtt_packet{header   = #mqtt_packet_header{type   = ?PUBLISH,

+ 356 - 205
src/emqx_protocol.erl

@@ -27,6 +27,7 @@
 -export([ info/1
         , info/2
         , attrs/1
+        , caps/1
         ]).
 
 -export([ init/2
@@ -44,14 +45,12 @@
           proto_ver     :: emqx_types:version(),
           client        :: emqx_types:client(),
           session       :: emqx_session:session(),
-          mountfun      :: fun((emqx_topic:topic()) -> emqx_topic:topic()),
           keepalive     :: non_neg_integer(),
           will_msg      :: emqx_types:message(),
           enable_acl    :: boolean(),
-          is_bridge     :: boolean(),
-          topic_aliases :: map(),
-          alias_maximum :: map()
-        }).
+          topic_aliases :: maybe(map()),
+          alias_maximum :: maybe(map())
+         }).
 
 -opaque(proto_state() :: #protocol{}).
 
@@ -62,8 +61,6 @@ info(#protocol{proto_name = ProtoName,
                session    = Session,
                keepalive  = Keepalive,
                will_msg   = WillMsg,
-               enable_acl = EnableAcl,
-               is_bridge  = IsBridge,
                topic_aliases = Aliases}) ->
     #{proto_name => ProtoName,
       proto_ver => ProtoVer,
@@ -71,8 +68,6 @@ info(#protocol{proto_name = ProtoName,
       session => emqx_session:info(Session),
       keepalive => Keepalive,
       will_msg => WillMsg,
-      enable_acl => EnableAcl,
-      is_bridge => IsBridge,
       topic_aliases => Aliases
      }.
 
@@ -91,8 +86,6 @@ info(session, #protocol{session = Session}) ->
     Session;
 info(keepalive, #protocol{keepalive = Keepalive}) ->
     Keepalive;
-info(is_bridge, #protocol{is_bridge = IsBridge}) ->
-    IsBridge;
 info(topic_aliases, #protocol{topic_aliases = Aliases}) ->
     Aliases.
 
@@ -100,34 +93,35 @@ attrs(#protocol{proto_name = ProtoName,
                 proto_ver  = ProtoVer,
                 client     = Client,
                 session    = Session,
-                keepalive  = Keepalive,
-                is_bridge  = IsBridge}) ->
+                keepalive  = Keepalive}) ->
     #{proto_name => ProtoName,
-      proto_ver => ProtoVer,
-      client => Client,
-      session => emqx_session:attrs(Session),
-      keepalive => Keepalive,
-      is_bridge => IsBridge
+      proto_ver  => ProtoVer,
+      client     => Client,
+      session    => emqx_session:attrs(Session),
+      keepalive  => Keepalive
      }.
 
+caps(#protocol{client = #{zone := Zone}}) ->
+    emqx_mqtt_caps:get_caps(Zone).
+
 -spec(init(map(), proplists:proplist()) -> proto_state()).
 init(ConnInfo, Options) ->
     Zone = proplists:get_value(zone, Options),
-    Peercert = maps:get(peercert, ConnInfo, nossl),
+    Peercert = maps:get(peercert, ConnInfo, undefined),
     Username = peer_cert_as_username(Peercert, Options),
     Mountpoint = emqx_zone:get_env(Zone, mountpoint),
-    Client = maps:merge(#{zone       => Zone,
-                          username   => Username,
-                          mountpoint => Mountpoint
+    EnableAcl = emqx_zone:get_env(Zone, enable_acl, true),
+    Client = maps:merge(#{zone         => Zone,
+                          username     => Username,
+                          mountpoint   => Mountpoint,
+                          is_bridge    => false,
+                          is_superuser => false
                          }, ConnInfo),
-    EnableAcl = emqx_zone:get_env(Zone, enable_acl, false),
-    MountFun = fun(Topic) ->
-                       emqx_mountpoint:mount(Mountpoint, Topic)
-               end,
-    #protocol{client     = Client,
-              mountfun   = MountFun,
-              enable_acl = EnableAcl,
-              is_bridge  = false
+    #protocol{proto_ver  = ?MQTT_PROTO_V4,
+              proto_name = <<"MQTT">>,
+              client     = Client,
+              %%mountfun   = MountFun,
+              enable_acl = EnableAcl
              }.
 
 peer_cert_as_username(Peercert, Options) ->
@@ -147,61 +141,39 @@ peer_cert_as_username(Peercert, Options) ->
        | {ok, emqx_types:packet(), proto_state()}
        | {error, Reason :: term(), proto_state()}
        | {stop, Error :: atom(), proto_state()}).
-handle_in(?CONNECT_PACKET(
-             #mqtt_packet_connect{proto_name = ProtoName,
-                                  proto_ver  = ProtoVer,
-                                  is_bridge  = IsBridge,
-                                  client_id  = ClientId,
-                                  username   = Username,
-                                  password   = Password,
-                                  keepalive  = Keepalive} = ConnPkt),
-          PState = #protocol{client = Client}) ->
-    Client1 = maps:merge(Client, #{client_id => ClientId,
-                                   username  => Username,
-                                   password  => Password
-                                  }),
-    emqx_logger:set_metadata_client_id(ClientId),
-    WillMsg = emqx_packet:will_msg(ConnPkt),
-    PState1 = PState#protocol{client     = Client1,
-                              proto_name = ProtoName,
-                              proto_ver  = ProtoVer,
-                              is_bridge  = IsBridge,
-                              keepalive  = Keepalive,
-                              will_msg   = WillMsg
-                             },
-    %% fun validate_packet/2,
-    case pipeline([fun check_connect/2,
-                   fun handle_connect/2], ConnPkt, PState1) of
-        {ok, SP, PState2} ->
-            handle_out({connack, ?RC_SUCCESS, sp(SP)}, PState2);
-        {error, ReasonCode} ->
-            handle_out({connack, ReasonCode}, PState1);
-        {error, ReasonCode, PState2} ->
-            handle_out({connack, ReasonCode}, PState2)
+handle_in(?CONNECT_PACKET(#mqtt_packet_connect{client_id = ClientId} = ConnPkt),
+          PState) ->
+    ok = emqx_logger:set_metadata_client_id(ClientId),
+    case pipeline([fun validate_in/2,
+                   fun preprocess_props/2,
+                   fun check_connect/2,
+                   fun enrich_pstate/2,
+                   fun auth_connect/2], ConnPkt, PState) of
+        {ok, NConnPkt, NPState} ->
+            handle_connect(NConnPkt, NPState);
+        {error, ReasonCode, NPState} ->
+            handle_out({disconnect, ReasonCode}, NPState)
     end;
 
 handle_in(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), PState) ->
-    case pipeline([fun validate_packet/2,
-                   fun check_pub_caps/2,
-                   fun check_pub_acl/2,
-                   fun handle_publish/2], Packet, PState) of
-        {error, ReasonCode} ->
-            ?LOG(warning, "Cannot publish qos~w message to ~s due to ~s",
-                 [QoS, Topic, emqx_reason_codes:text(ReasonCode)]),
-            handle_out(case QoS of
-                           ?QOS_0 -> {puberr, ReasonCode};
-                           ?QOS_1 -> {puback, PacketId, ReasonCode};
-                           ?QOS_2 -> {pubrec, PacketId, ReasonCode}
-                       end, PState);
-        Result -> Result
+    case pipeline([fun validate_in/2,
+                   fun preprocess_props/2,
+                   fun check_publish/2], Packet, PState) of
+        {ok, NPacket, NPState} ->
+            handle_publish(NPacket, NPState);
+        {error, ReasonCode, PState1} ->
+            ?LOG(warning, "Cannot publish message to ~s due to ~s",
+                 [Topic, emqx_reason_codes:text(ReasonCode)]),
+            handle_puback(QoS, PacketId, ReasonCode, PState1)
     end;
 
 handle_in(?PUBACK_PACKET(PacketId, ReasonCode), PState = #protocol{session = Session}) ->
     case emqx_session:puback(PacketId, ReasonCode, Session) of
         {ok, NSession} ->
             {ok, PState#protocol{session = NSession}};
+        {ok, Publishes, NSession} ->
+            handle_out({publish, Publishes}, PState#protocol{session = NSession});
         {error, _NotFound} ->
-            %% TODO: metrics? error msg?
             {ok, PState}
     end;
 
@@ -221,35 +193,37 @@ handle_in(?PUBREL_PACKET(PacketId, ReasonCode), PState = #protocol{session = Ses
             handle_out({pubcomp, PacketId, ReasonCode}, PState)
     end;
 
-handle_in(?PUBCOMP_PACKET(PacketId, ReasonCode),
-          PState = #protocol{session = Session}) ->
+handle_in(?PUBCOMP_PACKET(PacketId, ReasonCode), PState = #protocol{session = Session}) ->
     case emqx_session:pubcomp(PacketId, ReasonCode, Session) of
         {ok, NSession} ->
             {ok, PState#protocol{session = NSession}};
-        {error, _ReasonCode} ->
-            %% TODO: How to handle the reason code?
+        {ok, Publishes, NSession} ->
+            handle_out({publish, Publishes}, PState#protocol{session = NSession});
+        {error, _NotFound} ->
             {ok, PState}
     end;
 
 handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
           PState = #protocol{client = Client}) ->
-    case validate(Packet) of
-        ok -> ok = emqx_hooks:run('client.subscribe',
-                                  [Client, Properties, TopicFilters]),
-              TopicFilters1 = enrich_subid(Properties, TopicFilters),
-              {ReasonCodes, PState1} = handle_subscribe(TopicFilters1, PState),
-              handle_out({suback, PacketId, ReasonCodes}, PState1);
+    case validate_in(Packet, PState) of
+        ok ->
+            ok = emqx_hooks:run('client.subscribe',
+                                [Client, Properties, TopicFilters]),
+            TopicFilters1 = enrich_subid(Properties, TopicFilters),
+            {ReasonCodes, PState1} = handle_subscribe(TopicFilters1, PState),
+            handle_out({suback, PacketId, ReasonCodes}, PState1);
         {error, ReasonCode} ->
             handle_out({disconnect, ReasonCode}, PState)
     end;
 
 handle_in(Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
           PState = #protocol{client = Client}) ->
-    case validate(Packet) of
-        ok -> ok = emqx_hooks:run('client.unsubscribe',
-                                  [Client, Properties, TopicFilters]),
-              {ReasonCodes, PState1} = handle_unsubscribe(TopicFilters, PState),
-              handle_out({unsuback, PacketId, ReasonCodes}, PState1);
+    case validate_in(Packet, PState) of
+        ok ->
+            ok = emqx_hooks:run('client.unsubscribe',
+                                [Client, Properties, TopicFilters]),
+            {ReasonCodes, PState1} = handle_unsubscribe(TopicFilters, PState),
+            handle_out({unsuback, PacketId, ReasonCodes}, PState1);
         {error, ReasonCode} ->
             handle_out({disconnect, ReasonCode}, PState)
     end;
@@ -280,7 +254,7 @@ handle_in(Packet, PState) ->
 
 handle_deliver(Delivers, PState = #protocol{client = Client, session = Session})
   when is_list(Delivers) ->
-    case emqx_session:handle(Delivers, Session) of
+    case emqx_session:deliver(Delivers, Session) of
         {ok, Publishes, NSession} ->
             Packets = lists:map(fun({publish, PacketId, Msg}) ->
                                         Msg0 = emqx_hooks:run_fold('message.deliver', [Client], Msg),
@@ -293,12 +267,24 @@ handle_deliver(Delivers, PState = #protocol{client = Client, session = Session})
             {ok, PState#protocol{session = NSession}}
     end.
 
+%%--------------------------------------------------------------------
+%% Handle puback
+%%--------------------------------------------------------------------
+
+handle_puback(?QOS_0, _PacketId, ReasonCode, PState) ->
+    handle_out({puberr, ReasonCode}, PState);
+handle_puback(?QOS_1, PacketId, ReasonCode, PState) ->
+    handle_out({puback, PacketId, ReasonCode}, PState);
+handle_puback(?QOS_2, PacketId, ReasonCode, PState) ->
+    handle_out({pubrec, PacketId, ReasonCode}, PState).
+
 %%--------------------------------------------------------------------
 %% Handle outgoing packet
 %%--------------------------------------------------------------------
 
 handle_out({connack, ?RC_SUCCESS, SP}, PState = #protocol{client = Client}) ->
-    ok = emqx_hooks:run('client.connected', [Client, ?RC_SUCCESS, info(PState)]),
+    ok = emqx_hooks:run('client.connected',
+                        [Client, ?RC_SUCCESS, info(PState)]),
     Props = #{}, %% TODO: ...
     {ok, ?CONNACK_PACKET(?RC_SUCCESS, SP, Props), PState};
 
@@ -312,13 +298,24 @@ handle_out({connack, ReasonCode}, PState = #protocol{client = Client,
     Reason = emqx_reason_codes:name(ReasonCode1, ProtoVer),
     {error, Reason, ?CONNACK_PACKET(ReasonCode1), PState};
 
+handle_out({publish, Publishes}, PState = #protocol{client = Client = #{mountpoint := Mountpoint}}) ->
+    Mount = fun(Msg) -> emqx_mountpoint:mount(Mountpoint, Msg) end,
+    Packets = lists:map(
+                fun({publish, PacketId, Msg}) ->
+                        Msg1 = emqx_hooks:run_fold('message.deliver', [Client], Msg),
+                        Msg2 = Mount(emqx_message:update_expiry(Msg1)),
+                        emqx_packet:from_message(PacketId, Msg2)
+                end, Publishes),
+    {ok, Packets, PState};
+
 handle_out({publish, PacketId, Msg}, PState = #protocol{client = Client}) ->
     Msg0 = emqx_hooks:run_fold('message.deliver', [Client], Msg),
     Msg1 = emqx_message:update_expiry(Msg0),
     Msg2 = emqx_mountpoint:unmount(maps:get(mountpoint, Client), Msg1),
     {ok, emqx_packet:from_message(PacketId, Msg2), PState};
 
-handle_out({puberr, ReasonCode}, PState) ->
+%% TODO: How to handle the err?
+handle_out({puberr, _ReasonCode}, PState) ->
     {ok, PState};
 
 handle_out({puback, PacketId, ReasonCode}, PState) ->
@@ -364,77 +361,265 @@ handle_timeout(TRef, Msg, PState = #protocol{session = Session}) ->
         {ok, NSession} ->
             {ok, PState#protocol{session = NSession}};
         {ok, Publishes, NSession} ->
-            %% TODO: handle out...
-            io:format("Timeout publishes: ~p~n", [Publishes]),
-            {ok, PState#protocol{session = NSession}}
+            handle_out({publish, Publishes}, PState#protocol{session = NSession})
     end.
 
-terminate(Reason, _State) ->
+terminate(Reason, _PState) ->
     io:format("Terminated for ~p~n", [Reason]),
     ok.
 
+%%--------------------------------------------------------------------
+%% Validate incoming packet
+%%--------------------------------------------------------------------
+
+-spec(validate_in(emqx_types:packet(), proto_state())
+      -> ok | {error, emqx_types:reason_code()}).
+validate_in(Packet, _PState) ->
+    try emqx_packet:validate(Packet) of
+        true -> ok
+    catch
+        error:protocol_error ->
+            {error, ?RC_PROTOCOL_ERROR};
+        error:subscription_identifier_invalid ->
+            {error, ?RC_SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED};
+        error:topic_alias_invalid ->
+            {error, ?RC_TOPIC_ALIAS_INVALID};
+        error:topic_filters_invalid ->
+            {error, ?RC_TOPIC_FILTER_INVALID};
+        error:topic_name_invalid ->
+            {error, ?RC_TOPIC_FILTER_INVALID};
+        error:_Reason ->
+            {error, ?RC_MALFORMED_PACKET}
+    end.
+
+%%--------------------------------------------------------------------
+%% PreProcess Properties
+%%--------------------------------------------------------------------
+
+preprocess_props(#mqtt_packet_connect{
+                    properties = #{'Topic-Alias-Maximum' := Max}
+                   },
+                 PState = #protocol{alias_maximum = AliasMaximum}) ->
+    {ok, PState#protocol{alias_maximum = AliasMaximum#{outbound => Max}}};
+
+preprocess_props(Packet = #mqtt_packet{variable = Publish}, PState) ->
+    case preprocess_props(Publish, PState) of
+        {ok, Publish1, PState1} ->
+            {ok, Packet#mqtt_packet{variable = Publish1}, PState1};
+        Error -> Error
+    end;
+
+preprocess_props(Publish = #mqtt_packet_publish{topic_name = <<>>,
+                                                properties = #{'Topic-Alias' := AliasId}
+                                               },
+                 PState = #protocol{topic_aliases = TopicAliases}) ->
+    case maps:find(AliasId, TopicAliases) of
+        {ok, Topic} ->
+            {ok, Publish#mqtt_packet_publish{topic_name = Topic}, PState};
+        false -> {error, ?RC_TOPIC_ALIAS_INVALID}
+    end;
+
+preprocess_props(Publish = #mqtt_packet_publish{topic_name = Topic,
+                                                properties = #{'Topic-Alias' := AliasId}
+                                               },
+                 PState = #protocol{topic_aliases = Aliases}) ->
+    Aliases1 = maps:put(AliasId, Topic, Aliases),
+    {ok, Publish, PState#protocol{topic_aliases = Aliases1}};
+
+preprocess_props(Packet, PState) ->
+    {ok, Packet, PState}.
+
 %%--------------------------------------------------------------------
 %% Check Connect Packet
 %%--------------------------------------------------------------------
 
-check_connect(_ConnPkt, PState) ->
-    {ok, PState}.
+check_connect(ConnPkt, PState) ->
+    pipeline([fun check_proto_ver/2,
+              fun check_client_id/2,
+              %%fun check_flapping/2,
+              fun check_banned/2,
+              fun check_will_topic/2,
+              fun check_will_retain/2], ConnPkt, PState).
+
+check_proto_ver(#mqtt_packet_connect{proto_ver  = Ver,
+                                     proto_name = Name}, _PState) ->
+    case lists:member({Ver, Name}, ?PROTOCOL_NAMES) of
+        true  -> ok;
+        false -> {error, ?RC_PROTOCOL_ERROR}
+    end.
+
+%% MQTT3.1 does not allow null clientId
+check_client_id(#mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V3,
+                                     client_id = <<>>}, _PState) ->
+    {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID};
+
+%% Issue#599: Null clientId and clean_start = false
+check_client_id(#mqtt_packet_connect{client_id   = <<>>,
+                                     clean_start = false}, _PState) ->
+    {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID};
+check_client_id(#mqtt_packet_connect{client_id   = <<>>,
+                                     clean_start = true}, _PState) ->
+    ok;
+check_client_id(#mqtt_packet_connect{client_id = ClientId},
+                #protocol{client = #{zone := Zone}}) ->
+    Len = byte_size(ClientId),
+    MaxLen = emqx_zone:get_env(Zone, max_clientid_len),
+    case (1 =< Len) andalso (Len =< MaxLen) of
+        true  -> ok;
+        false -> {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID}
+    end.
+
+check_banned(#mqtt_packet_connect{client_id = ClientId,
+                                  username = Username},
+             #protocol{client = Client = #{zone := Zone}}) ->
+    case emqx_zone:get_env(Zone, enable_ban, false) of
+        true ->
+            case emqx_banned:check(Client#{client_id => ClientId,
+                                           username  => Username}) of
+                true  -> {error, ?RC_BANNED};
+                false -> ok
+            end;
+        false -> ok
+    end.
+
+check_will_topic(#mqtt_packet_connect{will_flag = false}, _PState) ->
+    ok;
+check_will_topic(#mqtt_packet_connect{will_topic = WillTopic}, _PState) ->
+    try emqx_topic:validate(WillTopic) of
+        true -> ok
+    catch error:_Error ->
+        {error, ?RC_TOPIC_NAME_INVALID}
+    end.
+
+check_will_retain(#mqtt_packet_connect{will_retain = false}, _PState) ->
+    ok;
+check_will_retain(#mqtt_packet_connect{will_retain = true},
+                  #protocol{client = #{zone := Zone}}) ->
+    case emqx_zone:get_env(Zone, mqtt_retain_available, true) of
+        true  -> ok;
+        false -> {error, ?RC_RETAIN_NOT_SUPPORTED}
+    end.
 
 %%--------------------------------------------------------------------
-%% Handle Connect Packet
+%% Enrich state
 %%--------------------------------------------------------------------
 
-handle_connect(#mqtt_packet_connect{proto_name  = ProtoName,
-                                    proto_ver   = ProtoVer,
-                                    is_bridge   = IsBridge,
-                                    clean_start = CleanStart,
-                                    keepalive   = Keepalive,
-                                    properties  = ConnProps,
-                                    client_id   = ClientId,
-                                    username    = Username,
-                                    password    = Password
-                                   } = ConnPkt,
-               PState = #protocol{client = Client}) ->
-    case emqx_access_control:authenticate(
-           Client#{password => Password}) of
+enrich_pstate(#mqtt_packet_connect{proto_name = ProtoName,
+                                   proto_ver  = ProtoVer,
+                                   keepalive  = Keepalive,
+                                   client_id  = ClientId,
+                                   username   = Username,
+                                   is_bridge  = IsBridge
+                                  },
+              PState = #protocol{client = Client}) ->
+    Client1 = maps:merge(Client, #{client_id => ClientId,
+                                   username  => Username,
+                                   is_bridge => IsBridge
+                                  }),
+    {ok, PState#protocol{proto_name = ProtoName,
+                         proto_ver  = ProtoVer,
+                         client     = Client1,
+                         keepalive  = Keepalive}}.
+
+%%--------------------------------------------------------------------
+%% Auth Connect
+%%--------------------------------------------------------------------
+
+auth_connect(#mqtt_packet_connect{client_id = ClientId,
+                                  username  = Username,
+                                  password  = Password},
+             PState = #protocol{client = Client}) ->
+    case emqx_access_control:authenticate(Client#{password => Password}) of
         {ok, AuthResult} ->
-            Client1 = maps:merge(Client, AuthResult),
-            %% Open session
-            case open_session(ConnPkt, PState) of
-                {ok, Session, SP} ->
-                    PState1 = PState#protocol{client = Client1,
-                                              session = Session},
-                    ok = emqx_cm:register_channel(ClientId),
-                    {ok, SP, PState1};
-                {error, Error} ->
-                    ?LOG(error, "Failed to open session: ~p", [Error]),
-                    {error, ?RC_UNSPECIFIED_ERROR, PState#protocol{client = Client1}}
-            end;
+            {ok, PState#protocol{client = maps:merge(Client, AuthResult)}};
         {error, Reason} ->
             ?LOG(warning, "Client ~s (Username: '~s') login failed for ~p",
                  [ClientId, Username, Reason]),
-            {error, emqx_reason_codes:connack_error(Reason), PState}
+            {error, Reason}
     end.
 
+%%--------------------------------------------------------------------
+%% Handle Connect
+%%--------------------------------------------------------------------
+
+handle_connect(ConnPkt, PState) ->
+    case open_session(ConnPkt, PState) of
+        {ok, Session, SP} ->
+            WillMsg = emqx_packet:will_msg(ConnPkt),
+            handle_out({connack, ?RC_SUCCESS, sp(SP)},
+                       PState#protocol{session = Session, will_msg = WillMsg});
+        {error, Reason} ->
+            %% TODO: Unknown error?
+            ?LOG(error, "Failed to open session: ~p", [Reason]),
+            handle_out({connack, ?RC_UNSPECIFIED_ERROR}, PState)
+    end.
+
+%%--------------------------------------------------------------------
+%% Open session
+%%--------------------------------------------------------------------
+
 open_session(#mqtt_packet_connect{clean_start = CleanStart,
-                                  %%properties  = ConnProps,
-                                  client_id   = ClientId,
-                                  username    = Username} = ConnPkt,
-             PState = #protocol{client = Client}) ->
-    emqx_cm:open_session(maps:merge(Client, #{clean_start     => CleanStart,
-                                              max_inflight    => 0,
-                                              expiry_interval => 0})).
+                                  properties  = ConnProps},
+             #protocol{client = Client = #{zone := Zone}}) ->
+    MaxInflight = maps:get('Receive-Maximum', ConnProps,
+                           emqx_zone:get_env(Zone, max_inflight, 65535)),
+    Interval = maps:get('Session-Expiry-Interval', ConnProps,
+                        emqx_zone:get_env(Zone, session_expiry_interval, 0)),
+    emqx_cm:open_session(CleanStart, Client, #{max_inflight => MaxInflight,
+                                               expiry_interval => Interval
+                                              }).
 
 %%--------------------------------------------------------------------
 %% Handle Publish Message: Client -> Broker
 %%--------------------------------------------------------------------
 
-handle_publish(Packet = ?PUBLISH_PACKET(_QoS, Topic, PacketId),
-               PState = #protocol{client = Client = #{mountpoint := Mountpoint}}) ->
+
+%% Check Publish
+check_publish(Packet, PState) ->
+    pipeline([fun check_pub_alias/2,
+              fun check_pub_caps/2,
+              fun check_pub_acl/2], Packet, PState).
+
+%% Check Pub Alias
+check_pub_alias(#mqtt_packet{
+                   variable = #mqtt_packet_publish{
+                                 properties = #{'Topic-Alias' := AliasId}
+                                }
+                  },
+                #protocol{alias_maximum = Limits}) ->
+    case (Limits == undefined)
+            orelse (Max = maps:get(inbound, Limits, 0)) == 0
+                orelse (AliasId > Max) of
+        true  -> {error, ?RC_TOPIC_ALIAS_INVALID};
+        false -> ok
+    end;
+check_pub_alias(_Packet, _PState) -> ok.
+
+%% Check Pub Caps
+check_pub_caps(#mqtt_packet{header = #mqtt_packet_header{qos = QoS,
+                                                         retain = Retain
+                                                        }},
+               #protocol{client = #{zone := Zone}}) ->
+    emqx_mqtt_caps:check_pub(Zone, #{qos => QoS, retain => Retain}).
+
+%% Check Pub ACL
+check_pub_acl(_Packet, #protocol{enable_acl = false}) ->
+    ok;
+check_pub_acl(_Packet, #protocol{client = #{is_superuser := true}}) ->
+    ok;
+check_pub_acl(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = Topic}},
+              #protocol{client = Client}) ->
+    case emqx_access_control:check_acl(Client, publish, Topic) of
+        allow -> ok;
+        deny  -> {error, ?RC_NOT_AUTHORIZED}
+    end.
+
+handle_publish(Packet = ?PUBLISH_PACKET(_QoS, _Topic, PacketId),
+               PState = #protocol{client = Client = #{mountpoint := MountPoint}}) ->
     %% TODO: ugly... publish_to_msg(...)
-    Msg = emqx_packet:to_message(Client, Packet),
-    Msg1 = emqx_mountpoint:mount(Mountpoint, Msg),
-    Msg2 = emqx_message:set_flag(dup, false, Msg1),
+    Mount = fun(Msg) -> emqx_mountpoint:mount(MountPoint, Msg) end,
+    Msg1 = emqx_packet:to_message(Client, Packet),
+    Msg2 = Mount(emqx_message:set_flag(dup, false, Msg1)),
     handle_publish(PacketId, Msg2, PState).
 
 handle_publish(_PacketId, Msg = #message{qos = ?QOS_0}, PState) ->
@@ -472,15 +657,15 @@ handle_subscribe([{TopicFilter, SubOpts}|More], Acc, PState) ->
     handle_subscribe(More, [RC|Acc], PState1).
 
 do_subscribe(TopicFilter, SubOpts = #{qos := QoS},
-             PState = #protocol{client   = Client,
-                                session  = Session,
-                                mountfun = Mount}) ->
+             PState = #protocol{client  = Client = #{mountpoint := Mountpoint},
+                                session = Session}) ->
+    Mount = fun(Msg) -> emqx_mountpoint:mount(Mountpoint, Msg) end,
     %% 1. Parse 2. Check 3. Enrich 5. MountPoint 6. Session
     SubOpts1 = maps:merge(?DEFAULT_SUBOPTS, SubOpts),
     {TopicFilter1, SubOpts2} = emqx_topic:parse(TopicFilter, SubOpts1),
     SubOpts3 = enrich_subopts(SubOpts2, PState),
     case check_subscribe(TopicFilter1, PState) of
-        ok ->
+        {ok, _, _} -> %% TODO:...
             TopicFilter2 = Mount(TopicFilter1),
             case emqx_session:subscribe(Client, TopicFilter2, SubOpts3, Session) of
                 {ok, NSession} ->
@@ -497,14 +682,30 @@ enrich_subid(_Properties, TopicFilters) ->
 
 enrich_subopts(SubOpts, #protocol{proto_ver = ?MQTT_PROTO_V5}) ->
     SubOpts;
-enrich_subopts(SubOpts, #protocol{client = #{zone := Zone},
-                                  is_bridge = IsBridge}) ->
+enrich_subopts(SubOpts, #protocol{client = #{zone := Zone,
+                                             is_bridge := IsBridge}}) ->
     Rap = flag(IsBridge),
     Nl = flag(emqx_zone:get_env(Zone, ignore_loop_deliver, false)),
     SubOpts#{rap => Rap, nl => Nl}.
 
-check_subscribe(_TopicFilter, _PState) ->
-    ok.
+check_subscribe(TopicFilter, PState) ->
+    pipeline([%%TODO: fun check_sub_caps/2,
+              fun check_sub_acl/2], TopicFilter, PState).
+
+%% Check Sub Caps
+check_sub_caps(TopicFilter, #protocol{client = #{zone := Zone}}) ->
+    emqx_mqtt_caps:check_sub(Zone, TopicFilter).
+
+%% Check Sub ACL
+check_sub_acl(_TopicFilter, #protocol{enable_acl = false}) ->
+    ok;
+check_sub_acl(_TopicFilter, #protocol{client = #{is_superuser := true}}) ->
+    ok;
+check_sub_acl(TopicFilter, #protocol{client = Client}) ->
+    case emqx_access_control:check_acl(Client, subscribe, TopicFilter) of
+        allow -> ok;
+        deny  -> {error, ?RC_NOT_AUTHORIZED}
+    end.
 
 %%--------------------------------------------------------------------
 %% Handle Unsubscribe Request
@@ -520,9 +721,9 @@ handle_unsubscribe([TopicFilter|More], Acc, PState) ->
     {RC, PState1} = do_unsubscribe(TopicFilter, PState),
     handle_unsubscribe(More, [RC|Acc], PState1).
 
-do_unsubscribe(TopicFilter, PState = #protocol{client   = Client,
-                                               session  = Session,
-                                               mountfun = Mount}) ->
+do_unsubscribe(TopicFilter, PState = #protocol{client   = Client = #{mountpoint := Mountpoint},
+                                               session  = Session}) ->
+    Mount = fun(Topic) -> emqx_mountpoint:mount(Mountpoint, Topic) end,
     TopicFilter1 = Mount(element(1, emqx_topic:parse(TopicFilter))),
     case emqx_session:unsubscribe(Client, TopicFilter1, Session) of
         {ok, NSession} ->
@@ -530,65 +731,13 @@ do_unsubscribe(TopicFilter, PState = #protocol{client   = Client,
         {error, RC} -> {RC, PState}
     end.
 
-%%--------------------------------------------------------------------
-%% Validate Incoming Packet
-%%--------------------------------------------------------------------
-
-validate_packet(Packet, _PState) ->
-    validate(Packet).
-
--spec(validate(emqx_types:packet()) -> ok | {error, emqx_types:reason_code()}).
-validate(Packet) ->
-    try emqx_packet:validate(Packet) of
-        true -> ok
-    catch
-        error:protocol_error ->
-            {error, ?RC_PROTOCOL_ERROR};
-        error:subscription_identifier_invalid ->
-            {error, ?RC_SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED};
-        error:topic_alias_invalid ->
-            {error, ?RC_TOPIC_ALIAS_INVALID};
-        error:topic_filters_invalid ->
-            {error, ?RC_TOPIC_FILTER_INVALID};
-        error:topic_name_invalid ->
-            {error, ?RC_TOPIC_FILTER_INVALID};
-        error:_Reason ->
-            {error, ?RC_MALFORMED_PACKET}
-    end.
-
-%%--------------------------------------------------------------------
-%% Check Publish
-%%--------------------------------------------------------------------
-
-check_pub_caps(#mqtt_packet{header = #mqtt_packet_header{qos = QoS,
-                                                         retain = Retain},
-                            variable = #mqtt_packet_publish{}},
-               #protocol{client = #{zone := Zone}}) ->
-    emqx_mqtt_caps:check_pub(Zone, #{qos => QoS, retain => Retain}).
-
-check_pub_acl(_Packet, #protocol{enable_acl = false}) ->
-    ok;
-check_pub_acl(_Packet, #protocol{client = #{is_superuser := true}}) ->
-    ok;
-check_pub_acl(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = Topic}},
-              #protocol{client = Client}) ->
-    do_acl_check(Client, publish, Topic).
-
-check_sub_acl(_Packet, #protocol{enable_acl = false}) ->
-    ok.
-
-do_acl_check(Client, PubSub, Topic) ->
-    case emqx_access_control:check_acl(Client, PubSub, Topic) of
-        allow -> ok;
-        deny -> {error, ?RC_NOT_AUTHORIZED}
-    end.
-
 %%--------------------------------------------------------------------
 %% Pipeline
 %%--------------------------------------------------------------------
 
-pipeline([Fun], Packet, PState) ->
-    Fun(Packet, PState);
+pipeline([], Packet, PState) ->
+    {ok, Packet, PState};
+
 pipeline([Fun|More], Packet, PState) ->
     case Fun(Packet, PState) of
         ok -> pipeline(More, Packet, PState);
@@ -597,7 +746,9 @@ pipeline([Fun|More], Packet, PState) ->
         {ok, NPacket, NPState} ->
             pipeline(More, NPacket, NPState);
         {error, Reason} ->
-            {error, Reason}
+            {error, Reason, PState};
+        {error, Reason, NPState} ->
+            {error, Reason, NPState}
     end.
 
 %%--------------------------------------------------------------------

+ 16 - 17
src/emqx_session.erl

@@ -50,7 +50,7 @@
 
 -logger_header("[Session]").
 
--export([init/1]).
+-export([init/3]).
 
 -export([ info/1
         , attrs/1
@@ -68,7 +68,7 @@
         , pubcomp/3
         ]).
 
--export([handle/2]).
+-export([deliver/2]).
 
 -export([timeout/3]).
 
@@ -143,11 +143,9 @@
 %%--------------------------------------------------------------------
 
 %% @doc Init a session.
--spec(init(Attrs :: map()) -> session()).
-init(#{zone            := Zone,
-       clean_start     := CleanStart,
-       max_inflight    := MaxInflight,
-       expiry_interval := ExpiryInterval}) ->
+-spec(init(boolean(), emqx_types:client(), Options :: map()) -> session()).
+init(CleanStart, #{zone := Zone}, #{max_inflight := MaxInflight,
+                                    expiry_interval := ExpiryInterval}) ->
     #session{clean_start       = CleanStart,
              max_subscriptions = get_env(Zone, max_subscriptions, 0),
              subscriptions     = #{},
@@ -361,6 +359,7 @@ pubrec(PacketId, _ReasonCode, Session = #session{inflight = Inflight}) ->
             {ok, Session#session{inflight = Inflight1}};
         {value, {pubrel, _Ts}} ->
             ?LOG(warning, "The PUBREC ~w is duplicated", [PacketId]),
+            ok = emqx_metrics:inc('packets.pubrec.inuse'),
             {error, ?RC_PACKET_IDENTIFIER_IN_USE};
         none ->
             ?LOG(warning, "The PUBREC ~w is not found.", [PacketId]),
@@ -410,7 +409,7 @@ dequeue(Session = #session{inflight = Inflight, mqueue = Q}) ->
         true  -> {ok, Session};
         false ->
             {Msgs, Q1} = dequeue(batch_n(Inflight), [], Q),
-            handle(lists:reverse(Msgs), [], Session#session{mqueue = Q1})
+            deliver(lists:reverse(Msgs), [], Session#session{mqueue = Q1})
     end.
 
 dequeue(Cnt, Msgs, Q) when Cnt =< 0 ->
@@ -433,28 +432,28 @@ batch_n(Inflight) ->
 %% Broker -> Client: Publish | Msg
 %%--------------------------------------------------------------------
 
-handle(Delivers, Session = #session{subscriptions = Subs})
+deliver(Delivers, Session = #session{subscriptions = Subs})
   when is_list(Delivers) ->
     Msgs = [enrich(get_subopts(Topic, Subs), Msg, Session)
             || {deliver, Topic, Msg} <- Delivers],
-    handle(Msgs, [], Session).
+    deliver(Msgs, [], Session).
 
-handle([], Publishes, Session) ->
+deliver([], Publishes, Session) ->
     {ok, lists:reverse(Publishes), Session};
 
-handle([Msg = #message{qos = ?QOS_0}|More], Acc, Session) ->
-    handle(More, [{publish, undefined, Msg}|Acc], Session);
+deliver([Msg = #message{qos = ?QOS_0}|More], Acc, Session) ->
+    deliver(More, [{publish, undefined, Msg}|Acc], Session);
 
-handle([Msg = #message{qos = QoS}|More], Acc,
-       Session = #session{next_pkt_id = PacketId, inflight = Inflight})
+deliver([Msg = #message{qos = QoS}|More], Acc,
+        Session = #session{next_pkt_id = PacketId, inflight = Inflight})
     when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 ->
     case emqx_inflight:is_full(Inflight) of
         true ->
-            handle(More, Acc, enqueue(Msg, Session));
+            deliver(More, Acc, enqueue(Msg, Session));
         false ->
             Publish = {publish, PacketId, Msg},
             Session1 = await(PacketId, Msg, Session),
-            handle(More, [Publish|Acc], next_pkt_id(Session1))
+            deliver(More, [Publish|Acc], next_pkt_id(Session1))
     end.
 
 enqueue(Msg, Session = #session{mqueue = Q}) ->

+ 1 - 1
src/emqx_shared_sub.erl

@@ -118,7 +118,7 @@ dispatch(Group, Topic, Delivery = #delivery{message = Msg, results = Results}, F
         {Type, SubPid} ->
             case do_dispatch(SubPid, Topic, Msg, Type) of
                 ok ->
-                    Delivery#delivery{results = [{dispatch, {Group, Topic}, 1} | Results]};
+                    Delivery#delivery{results = [{deliver, {Group, Topic}, 1} | Results]};
                 {error, _Reason} ->
                     %% Failed to dispatch to this sub, try next.
                     dispatch(Group, Topic, Delivery, [SubPid | FailedSubs])