Преглед изворни кода

Improve the pipeline design of protocol and session

Feng Lee пре 6 година
родитељ
комит
2fdda99d6d
2 измењених фајлова са 350 додато и 249 уклоњено
  1. 344 247
      src/emqx_protocol.erl
  2. 6 2
      src/emqx_session.erl

+ 344 - 247
src/emqx_protocol.erl

@@ -38,44 +38,47 @@
         , terminate/2
         ]).
 
+-import(emqx_access_control,
+        [ authenticate/1
+        , check_acl/3
+        ]).
+
 -export_type([proto_state/0]).
 
 -record(protocol, {
-          proto_name    :: binary(),
-          proto_ver     :: emqx_types:version(),
           client        :: emqx_types:client(),
           session       :: emqx_session:session(),
+          proto_name    :: binary(),
+          proto_ver     :: emqx_types:ver(),
           keepalive     :: non_neg_integer(),
           will_msg      :: emqx_types:message(),
-          enable_acl    :: boolean(),
           topic_aliases :: maybe(map()),
-          alias_maximum :: maybe(map())
+          alias_maximum :: maybe(map()),
+          ack_props     :: maybe(emqx_types:properties()) %% Tmp props
          }).
 
 -opaque(proto_state() :: #protocol{}).
 
+-define(NO_PROPS, undefined).
+
 -spec(info(proto_state()) -> emqx_types:infos()).
-info(#protocol{proto_name = ProtoName,
-               proto_ver  = ProtoVer,
-               client     = Client,
+info(#protocol{client     = Client,
                session    = Session,
+               proto_name = ProtoName,
+               proto_ver  = ProtoVer,
                keepalive  = Keepalive,
                will_msg   = WillMsg,
                topic_aliases = Aliases}) ->
-    #{proto_name => ProtoName,
-      proto_ver => ProtoVer,
-      client => Client,
+    #{client => Client,
       session => emqx_session:info(Session),
+      proto_name => ProtoName,
+      proto_ver => ProtoVer,
       keepalive => Keepalive,
       will_msg => WillMsg,
       topic_aliases => Aliases
      }.
 
 -spec(info(atom(), proto_state()) -> term()).
-info(proto_name, #protocol{proto_name = ProtoName}) ->
-    ProtoName;
-info(proto_ver, #protocol{proto_ver = ProtoVer}) ->
-    ProtoVer;
 info(client, #protocol{client = Client}) ->
     Client;
 info(zone, #protocol{client = #{zone := Zone}}) ->
@@ -84,53 +87,54 @@ info(client_id, #protocol{client = #{client_id := ClientId}}) ->
     ClientId;
 info(session, #protocol{session = Session}) ->
     Session;
+info(proto_name, #protocol{proto_name = ProtoName}) ->
+    ProtoName;
+info(proto_ver, #protocol{proto_ver = ProtoVer}) ->
+    ProtoVer;
 info(keepalive, #protocol{keepalive = Keepalive}) ->
     Keepalive;
 info(topic_aliases, #protocol{topic_aliases = Aliases}) ->
     Aliases.
 
-attrs(#protocol{proto_name = ProtoName,
-                proto_ver  = ProtoVer,
-                client     = Client,
+attrs(#protocol{client     = Client,
                 session    = Session,
+                proto_name = ProtoName,
+                proto_ver  = ProtoVer,
                 keepalive  = Keepalive}) ->
-    #{proto_name => ProtoName,
-      proto_ver  => ProtoVer,
-      client     => Client,
+    #{client     => Client,
       session    => emqx_session:attrs(Session),
+      proto_name => ProtoName,
+      proto_ver  => ProtoVer,
       keepalive  => Keepalive
      }.
 
 caps(#protocol{client = #{zone := Zone}}) ->
     emqx_mqtt_caps:get_caps(Zone).
 
--spec(init(map(), proplists:proplist()) -> proto_state()).
+-spec(init(emqx_types:conn(), proplists:proplist()) -> proto_state()).
 init(ConnInfo, Options) ->
     Zone = proplists:get_value(zone, Options),
     Peercert = maps:get(peercert, ConnInfo, undefined),
-    Username = peer_cert_as_username(Peercert, Options),
-    Mountpoint = emqx_zone:get_env(Zone, mountpoint),
-    EnableAcl = emqx_zone:get_env(Zone, enable_acl, true),
+    Username = case peer_cert_as_username(Options) of
+                   cn  -> esockd_peercert:common_name(Peercert);
+                   dn  -> esockd_peercert:subject(Peercert);
+                   crt -> Peercert;
+                   _   -> undefined
+               end,
+    MountPoint = emqx_zone:get_env(Zone, mountpoint),
     Client = maps:merge(#{zone         => Zone,
                           username     => Username,
-                          mountpoint   => Mountpoint,
+                          mountpoint   => MountPoint,
                           is_bridge    => false,
                           is_superuser => false
                          }, ConnInfo),
-    #protocol{proto_ver  = ?MQTT_PROTO_V4,
+    #protocol{client     = Client,
               proto_name = <<"MQTT">>,
-              client     = Client,
-              %%mountfun   = MountFun,
-              enable_acl = EnableAcl
+              proto_ver  = ?MQTT_PROTO_V4
              }.
 
-peer_cert_as_username(Peercert, Options) ->
-    case proplists:get_value(peer_cert_as_username, Options) of
-        cn  -> esockd_peercert:common_name(Peercert);
-        dn  -> esockd_peercert:subject(Peercert);
-        crt -> Peercert;
-        _   -> undefined
-    end.
+peer_cert_as_username(Options) ->
+    proplists:get_value(peer_cert_as_username, Options).
 
 %%--------------------------------------------------------------------
 %% Handle incoming packet
@@ -139,40 +143,49 @@ peer_cert_as_username(Peercert, Options) ->
 -spec(handle_in(emqx_types:packet(), proto_state())
       -> {ok, proto_state()}
        | {ok, emqx_types:packet(), proto_state()}
+       | {ok, list(emqx_types:packet()), proto_state()}
        | {error, Reason :: term(), proto_state()}
        | {stop, Error :: atom(), proto_state()}).
-handle_in(?CONNECT_PACKET(#mqtt_packet_connect{client_id = ClientId} = ConnPkt),
-          PState) ->
+handle_in(?CONNECT_PACKET(
+             #mqtt_packet_connect{proto_name = ProtoName,
+                                  proto_ver  = ProtoVer,
+                                  keepalive  = Keepalive,
+                                  client_id  = ClientId
+                                 } = ConnPkt), PState) ->
+    PState1 = PState#protocol{proto_name = ProtoName,
+                              proto_ver  = ProtoVer,
+                              keepalive  = Keepalive
+                             },
     ok = emqx_logger:set_metadata_client_id(ClientId),
     case pipeline([fun validate_in/2,
-                   fun preprocess_props/2,
+                   fun process_props/2,
                    fun check_connect/2,
-                   fun enrich_pstate/2,
-                   fun auth_connect/2], ConnPkt, PState) of
+                   fun enrich_client/2,
+                   fun auth_connect/2], ConnPkt, PState1) of
         {ok, NConnPkt, NPState} ->
-            handle_connect(NConnPkt, NPState);
+            process_connect(NConnPkt, maybe_assign_clientid(NPState));
         {error, ReasonCode, NPState} ->
             handle_out({disconnect, ReasonCode}, NPState)
     end;
 
 handle_in(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), PState) ->
     case pipeline([fun validate_in/2,
-                   fun preprocess_props/2,
+                   fun process_alias/2,
                    fun check_publish/2], Packet, PState) of
         {ok, NPacket, NPState} ->
-            handle_publish(NPacket, NPState);
-        {error, ReasonCode, PState1} ->
+            process_publish(NPacket, NPState);
+        {error, ReasonCode, NPState} ->
             ?LOG(warning, "Cannot publish message to ~s due to ~s",
                  [Topic, emqx_reason_codes:text(ReasonCode)]),
-            handle_puback(QoS, PacketId, ReasonCode, PState1)
+            puback(QoS, PacketId, ReasonCode, NPState)
     end;
 
 handle_in(?PUBACK_PACKET(PacketId, ReasonCode), PState = #protocol{session = Session}) ->
     case emqx_session:puback(PacketId, ReasonCode, Session) of
-        {ok, NSession} ->
-            {ok, PState#protocol{session = NSession}};
         {ok, Publishes, NSession} ->
             handle_out({publish, Publishes}, PState#protocol{session = NSession});
+        {ok, NSession} ->
+            {ok, PState#protocol{session = NSession}};
         {error, _NotFound} ->
             {ok, PState}
     end;
@@ -195,10 +208,10 @@ handle_in(?PUBREL_PACKET(PacketId, ReasonCode), PState = #protocol{session = Ses
 
 handle_in(?PUBCOMP_PACKET(PacketId, ReasonCode), PState = #protocol{session = Session}) ->
     case emqx_session:pubcomp(PacketId, ReasonCode, Session) of
-        {ok, NSession} ->
-            {ok, PState#protocol{session = NSession}};
         {ok, Publishes, NSession} ->
             handle_out({publish, Publishes}, PState#protocol{session = NSession});
+        {ok, NSession} ->
+            {ok, PState#protocol{session = NSession}};
         {error, _NotFound} ->
             {ok, PState}
     end;
@@ -206,12 +219,14 @@ handle_in(?PUBCOMP_PACKET(PacketId, ReasonCode), PState = #protocol{session = Se
 handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
           PState = #protocol{client = Client}) ->
     case validate_in(Packet, PState) of
-        ok ->
-            ok = emqx_hooks:run('client.subscribe',
-                                [Client, Properties, TopicFilters]),
-            TopicFilters1 = enrich_subid(Properties, TopicFilters),
-            {ReasonCodes, PState1} = handle_subscribe(TopicFilters1, PState),
-            handle_out({suback, PacketId, ReasonCodes}, PState1);
+        ok -> TopicFilters1 = [emqx_topic:parse(TopicFilter, SubOpts)
+                               || {TopicFilter, SubOpts} <- TopicFilters],
+              TopicFilters2 = emqx_hooks:run_fold('client.subscribe',
+                                                  [Client, Properties],
+                                                  TopicFilters1),
+              TopicFilters3 = enrich_subid(Properties, TopicFilters2),
+              {ReasonCodes, NPState} = process_subscribe(TopicFilters3, PState),
+              handle_out({suback, PacketId, ReasonCodes}, NPState);
         {error, ReasonCode} ->
             handle_out({disconnect, ReasonCode}, PState)
     end;
@@ -219,11 +234,12 @@ handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
 handle_in(Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
           PState = #protocol{client = Client}) ->
     case validate_in(Packet, PState) of
-        ok ->
-            ok = emqx_hooks:run('client.unsubscribe',
-                                [Client, Properties, TopicFilters]),
-            {ReasonCodes, PState1} = handle_unsubscribe(TopicFilters, PState),
-            handle_out({unsuback, PacketId, ReasonCodes}, PState1);
+        ok -> TopicFilters1 = lists:map(fun emqx_topic:parse/1, TopicFilters),
+              TopicFilters2 = emqx_hooks:run_fold('client.unsubscribe',
+                                                  [Client, Properties],
+                                                  TopicFilters1),
+              {ReasonCodes, NPState} = process_unsubscribe(TopicFilters2, PState),
+              handle_out({unsuback, PacketId, ReasonCodes}, NPState);
         {error, ReasonCode} ->
             handle_out({disconnect, ReasonCode}, PState)
     end;
@@ -238,7 +254,7 @@ handle_in(?DISCONNECT_PACKET(?RC_SUCCESS), PState) ->
 handle_in(?DISCONNECT_PACKET(RC), PState = #protocol{proto_ver = Ver}) ->
     %% TODO:
     %% {stop, {shutdown, abnormal_disconnet}, PState};
-    {sto, {shutdown, emqx_reason_codes:name(RC, Ver)}, PState};
+    {stop, {shutdown, emqx_reason_codes:name(RC, Ver)}, PState};
 
 handle_in(?AUTH_PACKET(), PState) ->
     %%TODO: implement later.
@@ -252,45 +268,65 @@ handle_in(Packet, PState) ->
 %% Handle delivers
 %%--------------------------------------------------------------------
 
-handle_deliver(Delivers, PState = #protocol{client = Client, session = Session})
+handle_deliver(Delivers, PState = #protocol{session = Session})
   when is_list(Delivers) ->
     case emqx_session:deliver(Delivers, Session) of
         {ok, Publishes, NSession} ->
-            Packets = lists:map(fun({publish, PacketId, Msg}) ->
-                                        Msg0 = emqx_hooks:run_fold('message.deliver', [Client], Msg),
-                                        Msg1 = emqx_message:update_expiry(Msg0),
-                                        Msg2 = emqx_mountpoint:unmount(maps:get(mountpoint, Client), Msg1),
-                                        emqx_packet:from_message(PacketId, Msg2)
-                                end, Publishes),
-            {ok, Packets, PState#protocol{session = NSession}};
+            handle_out({publish, Publishes}, PState#protocol{session = NSession});
         {ok, NSession} ->
             {ok, PState#protocol{session = NSession}}
     end.
 
-%%--------------------------------------------------------------------
-%% Handle puback
-%%--------------------------------------------------------------------
-
-handle_puback(?QOS_0, _PacketId, ReasonCode, PState) ->
-    handle_out({puberr, ReasonCode}, PState);
-handle_puback(?QOS_1, PacketId, ReasonCode, PState) ->
-    handle_out({puback, PacketId, ReasonCode}, PState);
-handle_puback(?QOS_2, PacketId, ReasonCode, PState) ->
-    handle_out({pubrec, PacketId, ReasonCode}, PState).
-
 %%--------------------------------------------------------------------
 %% Handle outgoing packet
 %%--------------------------------------------------------------------
 
-handle_out({connack, ?RC_SUCCESS, SP}, PState = #protocol{client = Client}) ->
-    ok = emqx_hooks:run('client.connected',
-                        [Client, ?RC_SUCCESS, info(PState)]),
-    Props = #{}, %% TODO: ...
-    {ok, ?CONNACK_PACKET(?RC_SUCCESS, SP, Props), PState};
+handle_out({connack, ?RC_SUCCESS, SP},
+           PState = #protocol{client = Client = #{zone := Zone},
+                              ack_props = AckProps,
+                              alias_maximum = AliasMaximum}) ->
+    ok = emqx_hooks:run('client.connected', [Client, ?RC_SUCCESS, attrs(PState)]),
+    #{max_packet_size := MaxPktSize,
+      max_qos_allowed := MaxQoS,
+      mqtt_retain_available := Retain,
+      max_topic_alias := MaxAlias,
+      mqtt_shared_subscription := Shared,
+      mqtt_wildcard_subscription := Wildcard
+     } = caps(PState),
+    %% Response-Information is so far not set by broker.
+    %% i.e. It's a Client-to-Client contract for the request-response topic naming scheme.
+    %% According to MQTT 5.0 spec:
+    %%   A common use of this is to pass a globally unique portion of the topic tree which
+    %%   is reserved for this Client for at least the lifetime of its Session.
+    %%   This often cannot just be a random name as both the requesting Client and the
+    %%   responding Client need to be authorized to use it.
+    %% If we are to support it in the feature, the implementation should be flexible
+    %% to allow prefixing the response topic based on different ACL config.
+    %% e.g. prefix by username or client-id, so that unauthorized clients can not
+    %% subscribe requests or responses that are not intended for them.
+    AckProps1 = if AckProps == undefined -> #{}; true -> AckProps end,
+    AckProps2 = AckProps1#{'Retain-Available' => flag(Retain),
+                          'Maximum-Packet-Size' => MaxPktSize,
+                          'Topic-Alias-Maximum' => MaxAlias,
+                          'Wildcard-Subscription-Available' => flag(Wildcard),
+                          'Subscription-Identifier-Available' => 1,
+                          %'Response-Information' =>
+                          'Shared-Subscription-Available' => flag(Shared),
+                          'Maximum-QoS' => MaxQoS
+                         },
+    AckProps3 = case emqx_zone:get_env(Zone, server_keepalive) of
+                    undefined -> AckProps2;
+                    Keepalive -> AckProps2#{'Server-Keep-Alive' => Keepalive}
+                end,
+    AliasMaximum1 = set_property(inbound, MaxAlias, AliasMaximum),
+    PState1 = PState#protocol{alias_maximum = AliasMaximum1,
+                              ack_props = undefined
+                             },
+    {ok, ?CONNACK_PACKET(?RC_SUCCESS, SP, AckProps3), PState1};
 
 handle_out({connack, ReasonCode}, PState = #protocol{client = Client,
                                                      proto_ver = ProtoVer}) ->
-    ok = emqx_hooks:run('client.connected', [Client, ReasonCode, info(PState)]),
+    ok = emqx_hooks:run('client.connected', [Client, ReasonCode, attrs(PState)]),
     ReasonCode1 = if
                       ProtoVer == ?MQTT_PROTO_V5 -> ReasonCode;
                       true -> emqx_reason_codes:compat(connack, ReasonCode)
@@ -298,21 +334,15 @@ handle_out({connack, ReasonCode}, PState = #protocol{client = Client,
     Reason = emqx_reason_codes:name(ReasonCode1, ProtoVer),
     {error, Reason, ?CONNACK_PACKET(ReasonCode1), PState};
 
-handle_out({publish, Publishes}, PState = #protocol{client = Client = #{mountpoint := Mountpoint}}) ->
-    Mount = fun(Msg) -> emqx_mountpoint:mount(Mountpoint, Msg) end,
-    Packets = lists:map(
-                fun({publish, PacketId, Msg}) ->
-                        Msg1 = emqx_hooks:run_fold('message.deliver', [Client], Msg),
-                        Msg2 = Mount(emqx_message:update_expiry(Msg1)),
-                        emqx_packet:from_message(PacketId, Msg2)
-                end, Publishes),
+handle_out({publish, Publishes}, PState = #protocol{client = Client}) ->
+    Packets = [element(2, handle_out(Publish, PState)) || Publish <- Publishes],
     {ok, Packets, PState};
 
 handle_out({publish, PacketId, Msg}, PState = #protocol{client = Client}) ->
-    Msg0 = emqx_hooks:run_fold('message.deliver', [Client], Msg),
-    Msg1 = emqx_message:update_expiry(Msg0),
-    Msg2 = emqx_mountpoint:unmount(maps:get(mountpoint, Client), Msg1),
-    {ok, emqx_packet:from_message(PacketId, Msg2), PState};
+    Msg1 = emqx_hooks:run_fold('message.deliver', [Client],
+                               emqx_message:update_expiry(Msg)),
+    Packet = emqx_packet:from_message(PacketId, unmount(Client, Msg1)),
+    {ok, Packet, PState};
 
 %% TODO: How to handle the err?
 handle_out({puberr, _ReasonCode}, PState) ->
@@ -393,40 +423,20 @@ validate_in(Packet, _PState) ->
     end.
 
 %%--------------------------------------------------------------------
-%% PreProcess Properties
+%% Preprocess properties
 %%--------------------------------------------------------------------
 
-preprocess_props(#mqtt_packet_connect{
-                    properties = #{'Topic-Alias-Maximum' := Max}
-                   },
-                 PState = #protocol{alias_maximum = AliasMaximum}) ->
-    {ok, PState#protocol{alias_maximum = AliasMaximum#{outbound => Max}}};
-
-preprocess_props(Packet = #mqtt_packet{variable = Publish}, PState) ->
-    case preprocess_props(Publish, PState) of
-        {ok, Publish1, PState1} ->
-            {ok, Packet#mqtt_packet{variable = Publish1}, PState1};
-        Error -> Error
-    end;
-
-preprocess_props(Publish = #mqtt_packet_publish{topic_name = <<>>,
-                                                properties = #{'Topic-Alias' := AliasId}
-                                               },
-                 PState = #protocol{topic_aliases = TopicAliases}) ->
-    case maps:find(AliasId, TopicAliases) of
-        {ok, Topic} ->
-            {ok, Publish#mqtt_packet_publish{topic_name = Topic}, PState};
-        false -> {error, ?RC_TOPIC_ALIAS_INVALID}
-    end;
-
-preprocess_props(Publish = #mqtt_packet_publish{topic_name = Topic,
-                                                properties = #{'Topic-Alias' := AliasId}
-                                               },
-                 PState = #protocol{topic_aliases = Aliases}) ->
-    Aliases1 = maps:put(AliasId, Topic, Aliases),
-    {ok, Publish, PState#protocol{topic_aliases = Aliases1}};
+process_props(#mqtt_packet_connect{
+                 properties = #{'Topic-Alias-Maximum' := Max}
+                },
+              PState = #protocol{alias_maximum = AliasMaximum}) ->
+    NAliasMaximum = if AliasMaximum == undefined ->
+                           #{outbound => Max};
+                       true -> AliasMaximum#{outbound => Max}
+                    end,
+    {ok, PState#protocol{alias_maximum = NAliasMaximum}};
 
-preprocess_props(Packet, PState) ->
+process_props(Packet, PState) ->
     {ok, Packet, PState}.
 
 %%--------------------------------------------------------------------
@@ -434,12 +444,15 @@ preprocess_props(Packet, PState) ->
 %%--------------------------------------------------------------------
 
 check_connect(ConnPkt, PState) ->
-    pipeline([fun check_proto_ver/2,
-              fun check_client_id/2,
-              %%fun check_flapping/2,
-              fun check_banned/2,
-              fun check_will_topic/2,
-              fun check_will_retain/2], ConnPkt, PState).
+    case pipeline([fun check_proto_ver/2,
+                   fun check_client_id/2,
+                   %%fun check_flapping/2,
+                   fun check_banned/2,
+                   fun check_will_topic/2,
+                   fun check_will_retain/2], ConnPkt, PState) of
+        ok -> {ok, PState};
+        Error -> Error
+    end.
 
 check_proto_ver(#mqtt_packet_connect{proto_ver  = Ver,
                                      proto_name = Name}, _PState) ->
@@ -450,16 +463,19 @@ check_proto_ver(#mqtt_packet_connect{proto_ver  = Ver,
 
 %% MQTT3.1 does not allow null clientId
 check_client_id(#mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V3,
-                                     client_id = <<>>}, _PState) ->
+                                     client_id = <<>>
+                                    }, _PState) ->
     {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID};
 
 %% Issue#599: Null clientId and clean_start = false
 check_client_id(#mqtt_packet_connect{client_id   = <<>>,
                                      clean_start = false}, _PState) ->
     {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID};
+
 check_client_id(#mqtt_packet_connect{client_id   = <<>>,
                                      clean_start = true}, _PState) ->
     ok;
+
 check_client_id(#mqtt_packet_connect{client_id = ClientId},
                 #protocol{client = #{zone := Zone}}) ->
     Len = byte_size(ClientId),
@@ -469,6 +485,7 @@ check_client_id(#mqtt_packet_connect{client_id = ClientId},
         false -> {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID}
     end.
 
+%%TODO: check banned...
 check_banned(#mqtt_packet_connect{client_id = ClientId,
                                   username = Username},
              #protocol{client = Client = #{zone := Zone}}) ->
@@ -501,25 +518,32 @@ check_will_retain(#mqtt_packet_connect{will_retain = true},
     end.
 
 %%--------------------------------------------------------------------
-%% Enrich state
+%% Enrich client
 %%--------------------------------------------------------------------
 
-enrich_pstate(#mqtt_packet_connect{proto_name = ProtoName,
-                                   proto_ver  = ProtoVer,
-                                   keepalive  = Keepalive,
-                                   client_id  = ClientId,
+enrich_client(#mqtt_packet_connect{client_id  = ClientId,
                                    username   = Username,
                                    is_bridge  = IsBridge
                                   },
               PState = #protocol{client = Client}) ->
-    Client1 = maps:merge(Client, #{client_id => ClientId,
-                                   username  => Username,
-                                   is_bridge => IsBridge
-                                  }),
-    {ok, PState#protocol{proto_name = ProtoName,
-                         proto_ver  = ProtoVer,
-                         client     = Client1,
-                         keepalive  = Keepalive}}.
+    Client1 = set_username(Username, Client#{client_id => ClientId,
+                                             is_bridge => IsBridge
+                                            }),
+    {ok, PState#protocol{client = maybe_username_as_clientid(Client1)}}.
+
+%% Username maybe not undefined if peer_cert_as_username
+set_username(Username, Client = #{username := undefined}) ->
+    Client#{username => Username};
+set_username(_Username, Client) -> Client.
+
+maybe_username_as_clientid(Client = #{username := undefined}) ->
+    Client;
+maybe_username_as_clientid(Client = #{zone := Zone,
+                                      username := Username}) ->
+    case emqx_zone:get_env(Zone, use_username_as_clientid, false) of
+        true -> Client#{client_id => Username};
+        false -> Client
+    end.
 
 %%--------------------------------------------------------------------
 %% Auth Connect
@@ -529,25 +553,39 @@ auth_connect(#mqtt_packet_connect{client_id = ClientId,
                                   username  = Username,
                                   password  = Password},
              PState = #protocol{client = Client}) ->
-    case emqx_access_control:authenticate(Client#{password => Password}) of
+    case authenticate(Client#{password => Password}) of
         {ok, AuthResult} ->
             {ok, PState#protocol{client = maps:merge(Client, AuthResult)}};
         {error, Reason} ->
             ?LOG(warning, "Client ~s (Username: '~s') login failed for ~p",
                  [ClientId, Username, Reason]),
-            {error, Reason}
+            {error, emqx_reason_codes:connack_error(Reason)}
     end.
 
 %%--------------------------------------------------------------------
-%% Handle Connect
+%% Assign a random clientId
 %%--------------------------------------------------------------------
 
-handle_connect(ConnPkt, PState) ->
+maybe_assign_clientid(PState = #protocol{client = Client = #{client_id := <<>>},
+                                         ack_props = AckProps}) ->
+    ClientId = emqx_guid:to_base62(emqx_guid:gen()),
+    Client1 = Client#{client_id => ClientId},
+    AckProps1 = set_property('Assigned-Client-Identifier', ClientId, AckProps),
+    PState#protocol{client = Client1, ack_props = AckProps1};
+maybe_assign_clientid(PState) -> PState.
+
+%%--------------------------------------------------------------------
+%% Process Connect
+%%--------------------------------------------------------------------
+
+process_connect(ConnPkt, PState) ->
     case open_session(ConnPkt, PState) of
         {ok, Session, SP} ->
             WillMsg = emqx_packet:will_msg(ConnPkt),
-            handle_out({connack, ?RC_SUCCESS, sp(SP)},
-                       PState#protocol{session = Session, will_msg = WillMsg});
+            NPState = PState#protocol{session = Session,
+                                      will_msg = WillMsg
+                                     },
+            handle_out({connack, ?RC_SUCCESS, sp(SP)}, NPState);
         {error, Reason} ->
             %% TODO: Unknown error?
             ?LOG(error, "Failed to open session: ~p", [Reason]),
@@ -561,24 +599,65 @@ handle_connect(ConnPkt, PState) ->
 open_session(#mqtt_packet_connect{clean_start = CleanStart,
                                   properties  = ConnProps},
              #protocol{client = Client = #{zone := Zone}}) ->
-    MaxInflight = maps:get('Receive-Maximum', ConnProps,
-                           emqx_zone:get_env(Zone, max_inflight, 65535)),
-    Interval = maps:get('Session-Expiry-Interval', ConnProps,
-                        emqx_zone:get_env(Zone, session_expiry_interval, 0)),
+    MaxInflight = get_property('Receive-Maximum', ConnProps,
+                               emqx_zone:get_env(Zone, max_inflight, 65535)),
+    Interval = get_property('Session-Expiry-Interval', ConnProps,
+                            emqx_zone:get_env(Zone, session_expiry_interval, 0)),
     emqx_cm:open_session(CleanStart, Client, #{max_inflight => MaxInflight,
                                                expiry_interval => Interval
                                               }).
 
 %%--------------------------------------------------------------------
-%% Handle Publish Message: Client -> Broker
+%% Process publish message: Client -> Broker
 %%--------------------------------------------------------------------
 
+process_alias(Packet = #mqtt_packet{
+                          variable = #mqtt_packet_publish{topic_name = <<>>,
+                                                          properties = #{'Topic-Alias' := AliasId}
+                                                         } = Publish
+                         }, PState = #protocol{topic_aliases = Aliases}) ->
+    case find_alias(AliasId, Aliases) of
+        {ok, Topic} ->
+            {ok, Packet#mqtt_packet{
+                   variable = Publish#mqtt_packet_publish{
+                                topic_name = Topic}}, PState};
+        false -> {error, ?RC_TOPIC_ALIAS_INVALID}
+    end;
+
+process_alias(#mqtt_packet{
+                 variable = #mqtt_packet_publish{topic_name = Topic,
+                                                 properties = #{'Topic-Alias' := AliasId}
+                                                }
+                }, PState = #protocol{topic_aliases = Aliases}) ->
+    {ok, PState#protocol{topic_aliases = save_alias(AliasId, Topic, Aliases)}};
+
+process_alias(_Packet, PState) ->
+    {ok, PState}.
+
+find_alias(_AliasId, undefined) ->
+    false;
+find_alias(AliasId, Aliases) ->
+    maps:find(AliasId, Aliases).
+
+save_alias(AliasId, Topic, undefined) ->
+    #{AliasId => Topic};
+save_alias(AliasId, Topic, Aliases) ->
+    maps:put(AliasId, Topic, Aliases).
 
 %% Check Publish
 check_publish(Packet, PState) ->
-    pipeline([fun check_pub_alias/2,
-              fun check_pub_caps/2,
-              fun check_pub_acl/2], Packet, PState).
+    pipeline([fun check_pub_acl/2,
+              fun check_pub_alias/2,
+              fun check_pub_caps/2], Packet, PState).
+
+%% Check Pub ACL
+check_pub_acl(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = Topic}},
+              #protocol{client = Client}) ->
+    case is_acl_enabled(Client) andalso check_acl(Client, publish, Topic) of
+        false -> ok;
+        allow -> ok;
+        deny  -> {error, ?RC_NOT_AUTHORIZED}
+    end.
 
 %% Check Pub Alias
 check_pub_alias(#mqtt_packet{
@@ -590,52 +669,41 @@ check_pub_alias(#mqtt_packet{
     case (Limits == undefined)
             orelse (Max = maps:get(inbound, Limits, 0)) == 0
                 orelse (AliasId > Max) of
-        true  -> {error, ?RC_TOPIC_ALIAS_INVALID};
-        false -> ok
+        false -> ok;
+        true  -> {error, ?RC_TOPIC_ALIAS_INVALID}
     end;
 check_pub_alias(_Packet, _PState) -> ok.
 
 %% Check Pub Caps
 check_pub_caps(#mqtt_packet{header = #mqtt_packet_header{qos = QoS,
                                                          retain = Retain
-                                                        }},
+                                                        }
+                           },
                #protocol{client = #{zone := Zone}}) ->
     emqx_mqtt_caps:check_pub(Zone, #{qos => QoS, retain => Retain}).
 
-%% Check Pub ACL
-check_pub_acl(_Packet, #protocol{enable_acl = false}) ->
-    ok;
-check_pub_acl(_Packet, #protocol{client = #{is_superuser := true}}) ->
-    ok;
-check_pub_acl(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = Topic}},
-              #protocol{client = Client}) ->
-    case emqx_access_control:check_acl(Client, publish, Topic) of
-        allow -> ok;
-        deny  -> {error, ?RC_NOT_AUTHORIZED}
-    end.
+%% Process Publish
+process_publish(Packet = ?PUBLISH_PACKET(_QoS, _Topic, PacketId),
+                PState = #protocol{client = Client}) ->
+    Msg = emqx_packet:to_message(Client, Packet),
+    %%TODO: Improve later.
+    Msg1 = emqx_message:set_flag(dup, false, Msg),
+    process_publish(PacketId, mount(Client, Msg1), PState).
 
-handle_publish(Packet = ?PUBLISH_PACKET(_QoS, _Topic, PacketId),
-               PState = #protocol{client = Client = #{mountpoint := MountPoint}}) ->
-    %% TODO: ugly... publish_to_msg(...)
-    Mount = fun(Msg) -> emqx_mountpoint:mount(MountPoint, Msg) end,
-    Msg1 = emqx_packet:to_message(Client, Packet),
-    Msg2 = Mount(emqx_message:set_flag(dup, false, Msg1)),
-    handle_publish(PacketId, Msg2, PState).
-
-handle_publish(_PacketId, Msg = #message{qos = ?QOS_0}, PState) ->
+process_publish(_PacketId, Msg = #message{qos = ?QOS_0}, PState) ->
     _ = emqx_broker:publish(Msg),
     {ok, PState};
 
-handle_publish(PacketId, Msg = #message{qos = ?QOS_1}, PState) ->
-    Results = emqx_broker:publish(Msg),
-    ReasonCode = emqx_reason_codes:puback(Results),
+process_publish(PacketId, Msg = #message{qos = ?QOS_1}, PState) ->
+    Deliveries = emqx_broker:publish(Msg),
+    ReasonCode = emqx_reason_codes:puback(Deliveries),
     handle_out({puback, PacketId, ReasonCode}, PState);
 
-handle_publish(PacketId, Msg = #message{qos = ?QOS_2},
-               PState = #protocol{session = Session}) ->
+process_publish(PacketId, Msg = #message{qos = ?QOS_2},
+                PState = #protocol{session = Session}) ->
     case emqx_session:publish(PacketId, Msg, Session) of
-        {ok, Results, NSession} ->
-            ReasonCode = emqx_reason_codes:puback(Results),
+        {ok, Deliveries, NSession} ->
+            ReasonCode = emqx_reason_codes:puback(Deliveries),
             handle_out({pubrec, PacketId, ReasonCode},
                        PState#protocol{session = NSession});
         {error, ReasonCode} ->
@@ -643,35 +711,40 @@ handle_publish(PacketId, Msg = #message{qos = ?QOS_2},
     end.
 
 %%--------------------------------------------------------------------
-%% Handle Subscribe Request
+%% Puback
+%%--------------------------------------------------------------------
+
+puback(?QOS_0, _PacketId, ReasonCode, PState) ->
+    handle_out({puberr, ReasonCode}, PState);
+puback(?QOS_1, PacketId, ReasonCode, PState) ->
+    handle_out({puback, PacketId, ReasonCode}, PState);
+puback(?QOS_2, PacketId, ReasonCode, PState) ->
+    handle_out({pubrec, PacketId, ReasonCode}, PState).
+
+%%--------------------------------------------------------------------
+%% Process subscribe request
 %%--------------------------------------------------------------------
 
-handle_subscribe(TopicFilters, PState) ->
-    handle_subscribe(TopicFilters, [], PState).
+process_subscribe(TopicFilters, PState) ->
+    process_subscribe(TopicFilters, [], PState).
 
-handle_subscribe([], Acc, PState) ->
+process_subscribe([], Acc, PState) ->
     {lists:reverse(Acc), PState};
 
-handle_subscribe([{TopicFilter, SubOpts}|More], Acc, PState) ->
-    {RC, PState1} = do_subscribe(TopicFilter, SubOpts, PState),
-    handle_subscribe(More, [RC|Acc], PState1).
+process_subscribe([{TopicFilter, SubOpts}|More], Acc, PState) ->
+    {RC, NPState} = do_subscribe(TopicFilter, SubOpts, PState),
+    process_subscribe(More, [RC|Acc], NPState).
 
 do_subscribe(TopicFilter, SubOpts = #{qos := QoS},
-             PState = #protocol{client  = Client = #{mountpoint := Mountpoint},
-                                session = Session}) ->
-    Mount = fun(Msg) -> emqx_mountpoint:mount(Mountpoint, Msg) end,
-    %% 1. Parse 2. Check 3. Enrich 5. MountPoint 6. Session
-    SubOpts1 = maps:merge(?DEFAULT_SUBOPTS, SubOpts),
-    {TopicFilter1, SubOpts2} = emqx_topic:parse(TopicFilter, SubOpts1),
-    SubOpts3 = enrich_subopts(SubOpts2, PState),
-    case check_subscribe(TopicFilter1, PState) of
-        {ok, _, _} -> %% TODO:...
-            TopicFilter2 = Mount(TopicFilter1),
-            case emqx_session:subscribe(Client, TopicFilter2, SubOpts3, Session) of
-                {ok, NSession} ->
-                    {QoS, PState#protocol{session = NSession}};
-                {error, RC} -> {RC, PState}
-            end;
+             PState = #protocol{client = Client, session = Session}) ->
+    case check_subscribe(TopicFilter, PState) of
+        ok -> TopicFilter1 = mount(Client, TopicFilter),
+              SubOpts1 = enrich_subopts(maps:merge(?DEFAULT_SUBOPTS, SubOpts), PState),
+              case emqx_session:subscribe(Client, TopicFilter1, SubOpts1, Session) of
+                  {ok, NSession} ->
+                      {QoS, PState#protocol{session = NSession}};
+                  {error, RC} -> {RC, PState}
+              end;
         {error, RC} -> {RC, PState}
     end.
 
@@ -682,55 +755,59 @@ enrich_subid(_Properties, TopicFilters) ->
 
 enrich_subopts(SubOpts, #protocol{proto_ver = ?MQTT_PROTO_V5}) ->
     SubOpts;
-enrich_subopts(SubOpts, #protocol{client = #{zone := Zone,
-                                             is_bridge := IsBridge}}) ->
+enrich_subopts(SubOpts, #protocol{client = #{zone := Zone, is_bridge := IsBridge}}) ->
     Rap = flag(IsBridge),
     Nl = flag(emqx_zone:get_env(Zone, ignore_loop_deliver, false)),
     SubOpts#{rap => Rap, nl => Nl}.
 
+%% Check Sub
 check_subscribe(TopicFilter, PState) ->
-    pipeline([%%TODO: fun check_sub_caps/2,
-              fun check_sub_acl/2], TopicFilter, PState).
-
-%% Check Sub Caps
-check_sub_caps(TopicFilter, #protocol{client = #{zone := Zone}}) ->
-    emqx_mqtt_caps:check_sub(Zone, TopicFilter).
+    case check_sub_acl(TopicFilter, PState) of
+        allow -> ok; %%TODO: check_sub_caps(TopicFilter, PState);
+        deny  -> {error, ?RC_NOT_AUTHORIZED}
+    end.
 
 %% Check Sub ACL
-check_sub_acl(_TopicFilter, #protocol{enable_acl = false}) ->
-    ok;
-check_sub_acl(_TopicFilter, #protocol{client = #{is_superuser := true}}) ->
-    ok;
 check_sub_acl(TopicFilter, #protocol{client = Client}) ->
-    case emqx_access_control:check_acl(Client, subscribe, TopicFilter) of
-        allow -> ok;
-        deny  -> {error, ?RC_NOT_AUTHORIZED}
+    case is_acl_enabled(Client) andalso
+         check_acl(Client, subscribe, TopicFilter) of
+        false  -> allow;
+        Result -> Result
     end.
 
+%% Check Sub Caps
+check_sub_caps(TopicFilter, #protocol{client = #{zone := Zone}}) ->
+    emqx_mqtt_caps:check_sub(Zone, TopicFilter).
+
 %%--------------------------------------------------------------------
-%% Handle Unsubscribe Request
+%% Process unsubscribe request
 %%--------------------------------------------------------------------
 
-handle_unsubscribe(TopicFilters, PState) ->
-    handle_unsubscribe(TopicFilters, [], PState).
+process_unsubscribe(TopicFilters, PState) ->
+    process_unsubscribe(TopicFilters, [], PState).
 
-handle_unsubscribe([], Acc, PState) ->
+process_unsubscribe([], Acc, PState) ->
     {lists:reverse(Acc), PState};
 
-handle_unsubscribe([TopicFilter|More], Acc, PState) ->
-    {RC, PState1} = do_unsubscribe(TopicFilter, PState),
-    handle_unsubscribe(More, [RC|Acc], PState1).
+process_unsubscribe([{TopicFilter, SubOpts}|More], Acc, PState) ->
+    {RC, PState1} = do_unsubscribe(TopicFilter, SubOpts, PState),
+    process_unsubscribe(More, [RC|Acc], PState1).
 
-do_unsubscribe(TopicFilter, PState = #protocol{client   = Client = #{mountpoint := Mountpoint},
-                                               session  = Session}) ->
-    Mount = fun(Topic) -> emqx_mountpoint:mount(Mountpoint, Topic) end,
-    TopicFilter1 = Mount(element(1, emqx_topic:parse(TopicFilter))),
-    case emqx_session:unsubscribe(Client, TopicFilter1, Session) of
+do_unsubscribe(TopicFilter, _SubOpts, PState = #protocol{client = Client,
+                                                         session = Session}) ->
+    case emqx_session:unsubscribe(Client, mount(Client, TopicFilter), Session) of
         {ok, NSession} ->
             {?RC_SUCCESS, PState#protocol{session = NSession}};
         {error, RC} -> {RC, PState}
     end.
 
+%%--------------------------------------------------------------------
+%% Is ACL enabled?
+%%--------------------------------------------------------------------
+
+is_acl_enabled(#{zone := Zone, is_superuser := IsSuperuser}) ->
+    (not IsSuperuser) andalso emqx_zone:get_env(Zone, enable_acl, true).
+
 %%--------------------------------------------------------------------
 %% Pipeline
 %%--------------------------------------------------------------------
@@ -745,16 +822,36 @@ pipeline([Fun|More], Packet, PState) ->
             pipeline(More, Packet, NPState);
         {ok, NPacket, NPState} ->
             pipeline(More, NPacket, NPState);
-        {error, Reason} ->
-            {error, Reason, PState};
-        {error, Reason, NPState} ->
-            {error, Reason, NPState}
+        {error, ReasonCode} ->
+            {error, ReasonCode, PState};
+        {error, ReasonCode, NPState} ->
+            {error, ReasonCode, NPState}
     end.
 
+%%--------------------------------------------------------------------
+%% Mount/Unmount
+%%--------------------------------------------------------------------
+
+mount(#{mountpoint := MountPoint}, TopicOrMsg) ->
+    emqx_mountpoint:mount(MountPoint, TopicOrMsg).
+
+unmount(#{mountpoint := MountPoint}, TopicOrMsg) ->
+    emqx_mountpoint:unmount(MountPoint, TopicOrMsg).
+
 %%--------------------------------------------------------------------
 %% Helper functions
 %%--------------------------------------------------------------------
 
+set_property(Name, Value, ?NO_PROPS) ->
+    #{Name => Value};
+set_property(Name, Value, Props) ->
+    Props#{Name => Value}.
+
+get_property(_Name, undefined, Default) ->
+    Default;
+get_property(Name, Props, Default) ->
+    maps:get(Name, Props, Default).
+
 sp(true)  -> 1;
 sp(false) -> 0.
 

+ 6 - 2
src/emqx_session.erl

@@ -136,6 +136,8 @@
 
 -opaque(session() :: #session{}).
 
+-type(publish() :: {publish, emqx_types:packet_id(), emqx_types:message()}).
+
 -define(DEFAULT_BATCH_N, 1000).
 
 %%--------------------------------------------------------------------
@@ -334,7 +336,8 @@ do_publish(PacketId, Msg = #message{timestamp = Ts},
 %%--------------------------------------------------------------------
 
 -spec(puback(emqx_types:packet_id(), emqx_types:reason_code(), session())
-      -> {ok, session()} | {error, emqx_types:reason_code()}).
+      -> {ok, session()} | {ok, list(publish()), session()} |
+         {error, emqx_types:reason_code()}).
 puback(PacketId, _ReasonCode, Session = #session{inflight = Inflight}) ->
     case emqx_inflight:lookup(PacketId, Inflight) of
         {value, {Msg, _Ts}} when is_record(Msg, message) ->
@@ -388,7 +391,8 @@ pubrel(PacketId, _ReasonCode, Session = #session{awaiting_rel = AwaitingRel}) ->
 %%--------------------------------------------------------------------
 
 -spec(pubcomp(emqx_types:packet_id(), emqx_types:reason_code(), session())
-      -> {ok, session()} | {error, emqx_types:reason_code()}).
+      -> {ok, session()} | {ok, list(publish()), session()} |
+         {error, emqx_types:reason_code()}).
 pubcomp(PacketId, _ReasonCode, Session = #session{inflight = Inflight}) ->
     case emqx_inflight:contain(PacketId, Inflight) of
         true ->