Pārlūkot izejas kodu

Merge branch 'emqx30-dev' into emqx30

Feng Lee 7 gadi atpakaļ
vecāks
revīzija
619d39b2fb

+ 54 - 0
TODO

@@ -1,4 +1,58 @@
 
+## MQTT 5.0
+
+1. Topic Alias
+2. Subscriber ID
+3. Session ensure stats
+4. Message Expiration
+
+## Connection
+
+## WebSocket
+
+## Listeners
+
+## Protocol
+
+1. Global ACL cache with limited age and size?
+2. Whether to enable ACL for each zone?
+
+## Session
+
+## Bridges
+
+Config
+CLI
+Remote Bridge
+replay queue
+
+## Access Control
+
+  Global ACL Cache
+  Add ACL cache emqx_access_control module
+
+## Zone
+
+## Hooks
+
+The hooks design...
+
+## MQueue
+
+Bound Queue
+LastValue Queue
+Priority Queue
+
+## Supervisor tree
+
+KernelSup
+
+## Managment
+
+## Dashboard
+
+## Testcases
+
 1. Update the README.md
 2. Update the Documentation
 3. Shared subscription and dispatch strategy

BIN
docs/mqtt-v5.0.pdf


Failā izmaiņas netiks attēlotas, jo tās ir par lielu
+ 190 - 183
etc/emqx.conf


+ 14 - 11
include/emqx.hrl

@@ -53,7 +53,9 @@
 
 -type(subid() :: binary() | atom()).
 
--type(subopts() :: #{qos => integer(), share => '$queue' | binary(), atom() => term()}).
+-type(subopts() :: #{qos    => integer(),
+                     share  => '$queue' | binary(),
+                     atom() => term()}).
 
 -record(subscription, {
           topic   :: topic(),
@@ -82,20 +84,21 @@
 -type(zone() :: atom()).
 
 -record(client, {
-          id          :: client_id(),
-          pid         :: pid(),
-          zone        :: zone(),
-          protocol    :: protocol(),
-          peername    :: peername(),
-          peercert    :: nossl | binary(),
-          username    :: username(),
-          clean_start :: boolean(),
-          attributes  :: map()
+          id         :: client_id(),
+          pid        :: pid(),
+          zone       :: zone(),
+          peername   :: peername(),
+          username   :: username(),
+          protocol   :: protocol(),
+          attributes :: map()
          }).
 
 -type(client() :: #client{}).
 
--record(session, {sid :: client_id(), pid :: pid()}).
+-record(session, {
+          sid :: client_id(),
+          pid :: pid()
+         }).
 
 -type(session() :: #session{}).
 

+ 31 - 6
include/emqx_mqtt.hrl

@@ -229,10 +229,17 @@
 
 -type(mqtt_properties() :: #{atom() => term()} | undefined).
 
-%% nl: no local, rap: retain as publish, rh: retain handling
--record(mqtt_subopts, {rh = 0, rap = 0, nl = 0, qos = ?QOS_0}).
+-type(mqtt_subopts() :: #{atom() => term()}).
 
--type(mqtt_subopts() :: #mqtt_subopts{}).
+-define(DEFAULT_SUBOPTS, #{rh  => 0,  %% Retain Handling
+                           rap => 0,  %% Retain as Publish
+                           nl  => 0,  %% No Local
+                           qos => ?QOS_0,
+                           rc  => 0,  %% Reason Code
+                           subid => 0 %% Subscription-Identifier
+                          }).
+
+-type(mqtt_topic_filters() :: [{mqtt_topic(), mqtt_subopts()}]).
 
 -record(mqtt_packet_connect, {
           proto_name   = <<"MQTT">>     :: binary(),
@@ -273,7 +280,7 @@
 -record(mqtt_packet_subscribe, {
           packet_id     :: mqtt_packet_id(),
           properties    :: mqtt_properties(),
-          topic_filters :: [{mqtt_topic(), mqtt_subopts()}]
+          topic_filters :: mqtt_topic_filters()
         }).
 
 -record(mqtt_packet_suback, {
@@ -391,6 +398,11 @@
                  variable = #mqtt_packet_puback{packet_id   = PacketId,
                                                 reason_code = 0}}).
 
+-define(PUBACK_PACKET(PacketId, ReasonCode),
+    #mqtt_packet{header   = #mqtt_packet_header{type = ?PUBACK},
+                 variable = #mqtt_packet_puback{packet_id   = PacketId,
+                                                reason_code = ReasonCode}}).
+
 -define(PUBACK_PACKET(PacketId, ReasonCode, Properties),
     #mqtt_packet{header   = #mqtt_packet_header{type = ?PUBACK},
                  variable = #mqtt_packet_puback{packet_id   = PacketId,
@@ -399,8 +411,13 @@
 
 -define(PUBREC_PACKET(PacketId),
         #mqtt_packet{header   = #mqtt_packet_header{type = ?PUBREC},
-                 variable = #mqtt_packet_puback{packet_id   = PacketId,
-                                                reason_code = 0}}).
+                     variable = #mqtt_packet_puback{packet_id   = PacketId,
+                                                    reason_code = 0}}).
+
+-define(PUBREC_PACKET(PacketId, ReasonCode),
+        #mqtt_packet{header   = #mqtt_packet_header{type = ?PUBREC},
+                     variable = #mqtt_packet_puback{packet_id   = PacketId,
+                                                    reason_code = ReasonCode}}).
 
 -define(PUBREC_PACKET(PacketId, ReasonCode, Properties),
     #mqtt_packet{header   = #mqtt_packet_header{type = ?PUBREC},
@@ -412,6 +429,10 @@
     #mqtt_packet{header   = #mqtt_packet_header{type = ?PUBREL, qos = ?QOS_1},
                  variable = #mqtt_packet_puback{packet_id   = PacketId,
                                                 reason_code = 0}}).
+-define(PUBREL_PACKET(PacketId, ReasonCode),
+    #mqtt_packet{header   = #mqtt_packet_header{type = ?PUBREL, qos = ?QOS_1},
+                 variable = #mqtt_packet_puback{packet_id   = PacketId,
+                                                reason_code = ReasonCode}}).
 
 -define(PUBREL_PACKET(PacketId, ReasonCode, Properties),
     #mqtt_packet{header   = #mqtt_packet_header{type = ?PUBREL, qos = ?QOS_1},
@@ -423,6 +444,10 @@
     #mqtt_packet{header   = #mqtt_packet_header{type = ?PUBCOMP},
                  variable = #mqtt_packet_puback{packet_id   = PacketId,
                                                 reason_code = 0}}).
+-define(PUBCOMP_PACKET(PacketId, ReasonCode),
+    #mqtt_packet{header   = #mqtt_packet_header{type = ?PUBCOMP},
+                 variable = #mqtt_packet_puback{packet_id   = PacketId,
+                                                reason_code = ReasonCode}}).
 
 -define(PUBCOMP_PACKET(PacketId, ReasonCode, Properties),
     #mqtt_packet{header   = #mqtt_packet_header{type = ?PUBCOMP},

+ 171 - 157
priv/emqx.schema

@@ -559,6 +559,12 @@ end}.
   {datatype, {enum, [true, false]}}
 ]}.
 
+%% @doc ACL nomatch.
+{mapping, "acl_nomatch", "emqx.acl_nomatch", [
+  {default, deny},
+  {datatype, {enum, [allow, deny]}}
+]}.
+
 %% @doc Default ACL file.
 {mapping, "acl_file", "emqx.acl_file", [
   {datatype, string},
@@ -584,7 +590,7 @@ end}.
 %% ]}.
 
 %%--------------------------------------------------------------------
-%% MQTT
+%% MQTT Protocol
 %%--------------------------------------------------------------------
 
 %% @doc Max Packet Size Allowed, 1MB by default.
@@ -636,6 +642,170 @@ end}.
   {datatype, {enum, [true, false]}}
 ]}.
 
+%%--------------------------------------------------------------------
+%% Zones
+%%--------------------------------------------------------------------
+
+%% @doc Idle timeout of the MQTT connection.
+{mapping, "zone.$name.idle_timeout", "emqx.zones", [
+  {default, "15s"},
+  {datatype, {duration, ms}}
+]}.
+
+{mapping, "zone.$name.allow_anonymous", "emqx.zones", [
+  {datatype, {enum, [true, false]}}
+]}.
+
+{mapping, "zone.$name.acl_nomatch", "emqx.zones", [
+  {datatype, {enum, [allow, deny]}}
+]}.
+
+%% @doc Enable ACL check.
+{mapping, "zone.$name.enable_acl", "emqx.zones", [
+  {default, off},
+  {datatype, flag}
+]}.
+
+%% @doc Enable per connection statistics.
+{mapping, "zone.$name.enable_stats", "emqx.zones", [
+  {default, off},
+  {datatype, flag}
+]}.
+
+%% @doc Publish limit of the MQTT connections.
+{mapping, "zone.$name.publish_limit", "emqx.zones", [
+  {default, undefined},
+  {datatype, string}
+]}.
+
+%% @doc Max Packet Size Allowed, 64K by default.
+{mapping, "zone.$name.max_packet_size", "emqx.zones", [
+  {datatype, bytesize}
+]}.
+
+%% @doc Set the Max ClientId Length Allowed.
+{mapping, "zone.$name.max_clientid_len", "emqx.zones", [
+  {datatype, integer}
+]}.
+
+%% @doc Set the Maximum topic levels.
+{mapping, "zone.$name.max_topic_levels", "emqx.zones", [
+  {datatype, integer}
+]}.
+
+%% @doc Set the Maximum QoS allowed.
+{mapping, "zone.$name.max_qos_allowed", "emqx.zones", [
+  {datatype, integer},
+  {validators, ["range:0-2"]}
+]}.
+
+%% @doc Set the Maximum topic alias.
+{mapping, "zone.$name.max_topic_alias", "emqx.zones", [
+  {datatype, integer}
+]}.
+
+%% @doc Whether the server supports retained messages.
+{mapping, "zone.$name.retain_available", "emqx.zones", [
+  {datatype, {enum, [true, false]}}
+]}.
+
+%% @doc Whether the Server supports Wildcard Subscriptions.
+{mapping, "zone.$name.wildcard_subscription", "emqx.zones", [
+  {datatype, {enum, [true, false]}}
+]}.
+
+%% @doc Whether the Server supports Shared Subscriptions.
+{mapping, "zone.$name.shared_subscription", "emqx.zones", [
+  {datatype, {enum, [true, false]}}
+]}.
+
+%% @doc Keepalive backoff
+{mapping, "zone.$name.keepalive_backoff", "emqx.zones", [
+  {default, 0.75},
+  {datatype, float}
+]}.
+
+%% @doc Max Number of Subscriptions Allowed.
+{mapping, "zone.$name.max_subscriptions", "emqx.zones", [
+  {default, 0},
+  {datatype, integer}
+]}.
+
+%% @doc Upgrade QoS according to subscription?
+{mapping, "zone.$name.upgrade_qos", "emqx.zones", [
+  {default, off},
+  {datatype, flag}
+]}.
+
+%% @doc Max number of QoS 1 and 2 messages that can be “inflight” at one time.
+%% 0 means no limit
+{mapping, "zone.$name.max_inflight", "emqx.zones", [
+  {default, 0},
+  {datatype, integer}
+]}.
+
+%% @doc Retry interval for redelivering QoS1/2 messages.
+{mapping, "zone.$name.retry_interval", "emqx.zones", [
+  {default, "20s"},
+  {datatype, {duration, ms}}
+]}.
+
+%% @doc Max Packets that Awaiting PUBREL, 0 means no limit
+{mapping, "zone.$name.max_awaiting_rel", "emqx.zones", [
+  {default, 0},
+  {datatype, integer}
+]}.
+
+%% @doc Awaiting PUBREL timeout
+{mapping, "zone.$name.await_rel_timeout", "emqx.zones", [
+  {default, "60s"},
+  {datatype, {duration, ms}}
+]}.
+
+%% @doc Ignore loop delivery of messages
+{mapping, "zone.$name.ignore_loop_deliver", "emqx.zones", [
+  {default, false},
+  {datatype, {enum, [true, false]}}
+]}.
+
+%% @doc Session Expiry Interval
+{mapping, "zone.$name.session_expiry_interval", "emqx.zones", [
+  {default, "2h"},
+  {datatype, {duration, ms}}
+]}.
+
+%% @doc Max queue length. Enqueued messages when persistent client
+%% disconnected, or inflight window is full. 0 means no limit.
+{mapping, "zone.$name.max_mqueue_len", "emqx.zones", [
+  {default, 1000},
+  {datatype, integer}
+]}.
+
+%% @doc Queue Qos0 messages?
+{mapping, "zone.$name.mqueue_store_qos0", "emqx.zones", [
+  {default, true},
+  {datatype, {enum, [true, false]}}
+]}.
+
+{translation, "emqx.zones", fun(Conf) ->
+  Mapping = fun("retain_available", Val) ->
+                    {mqtt_retain_available, Val};
+               ("wildcard_subscription", Val) ->
+                    {mqtt_wildcard_subscription, Val};
+               ("shared_subscription", Val) ->
+                    {mqtt_shared_subscription, Val};
+               (Opt, Val) ->
+                    {list_to_atom(Opt), Val}
+            end,
+  maps:to_list(
+    lists:foldl(
+      fun({["zone", Name, Opt], Val}, Zones) ->
+              maps:update_with(list_to_atom(Name),
+                               fun(Opts) -> [Mapping(Opt, Val)|Opts] end,
+                               [Mapping(Opt, Val)], Zones)
+      end, #{}, lists:usort(cuttlefish_variable:filter_by_prefix("zone.", Conf))))
+end}.
+
 %%--------------------------------------------------------------------
 %% Listeners
 %%--------------------------------------------------------------------
@@ -1234,162 +1404,6 @@ end}.
                                                ++ cuttlefish_variable:filter_by_prefix("listener.wss", Conf)])
 end}.
 
-%%--------------------------------------------------------------------
-%% Zones
-%%--------------------------------------------------------------------
-
-%% @doc Idle timeout of the MQTT connection.
-{mapping, "zone.$name.idle_timeout", "emqx.zones", [
-  {default, "15s"},
-  {datatype, {duration, ms}}
-]}.
-
-%% @doc Enable ACL check.
-{mapping, "zone.$name.enable_acl", "emqx.zones", [
-  {default, off},
-  {datatype, flag}
-]}.
-
-%% @doc Enable per connection statistics.
-{mapping, "zone.$name.enable_stats", "emqx.zones", [
-  {default, off},
-  {datatype, flag}
-]}.
-
-%% @doc Publish limit of the MQTT connections.
-{mapping, "zone.$name.publish_limit", "emqx.zones", [
-  {default, undefined},
-  {datatype, string}
-]}.
-
-%% @doc Max Packet Size Allowed, 64K by default.
-{mapping, "zone.$name.max_packet_size", "emqx.zones", [
-  {datatype, bytesize}
-]}.
-
-%% @doc Set the Max ClientId Length Allowed.
-{mapping, "zone.$name.max_clientid_len", "emqx.zones", [
-  {datatype, integer}
-]}.
-
-%% @doc Set the Maximum topic levels.
-{mapping, "zone.$name.max_topic_levels", "emqx.zones", [
-  {datatype, integer}
-]}.
-
-%% @doc Set the Maximum QoS allowed.
-{mapping, "zone.$name.max_qos_allowed", "emqx.zones", [
-  {datatype, integer},
-  {validators, ["range:0-2"]}
-]}.
-
-%% @doc Set the Maximum topic alias.
-{mapping, "zone.$name.max_topic_alias", "emqx.zones", [
-  {datatype, integer}
-]}.
-
-%% @doc Whether the server supports retained messages.
-{mapping, "zone.$name.retain_available", "emqx.zones", [
-  {datatype, {enum, [true, false]}}
-]}.
-
-%% @doc Whether the Server supports Wildcard Subscriptions.
-{mapping, "zone.$name.wildcard_subscription", "emqx.zones", [
-  {datatype, {enum, [true, false]}}
-]}.
-
-%% @doc Whether the Server supports Shared Subscriptions.
-{mapping, "zone.$name.shared_subscription", "emqx.zones", [
-  {datatype, {enum, [true, false]}}
-]}.
-
-%% @doc Keepalive backoff
-{mapping, "zone.$name.keepalive_backoff", "emqx.zones", [
-  {default, 0.75},
-  {datatype, float}
-]}.
-
-%% @doc Max Number of Subscriptions Allowed.
-{mapping, "zone.$name.max_subscriptions", "emqx.zones", [
-  {default, 0},
-  {datatype, integer}
-]}.
-
-%% @doc Upgrade QoS according to subscription?
-{mapping, "zone.$name.upgrade_qos", "emqx.zones", [
-  {default, off},
-  {datatype, flag}
-]}.
-
-%% @doc Max number of QoS 1 and 2 messages that can be “inflight” at one time.
-%% 0 means no limit
-{mapping, "zone.$name.max_inflight", "emqx.zones", [
-  {default, 0},
-  {datatype, integer}
-]}.
-
-%% @doc Retry interval for redelivering QoS1/2 messages.
-{mapping, "zone.$name.retry_interval", "emqx.zones", [
-  {default, "20s"},
-  {datatype, {duration, ms}}
-]}.
-
-%% @doc Max Packets that Awaiting PUBREL, 0 means no limit
-{mapping, "zone.$name.max_awaiting_rel", "emqx.zones", [
-  {default, 0},
-  {datatype, integer}
-]}.
-
-%% @doc Awaiting PUBREL timeout
-{mapping, "zone.$name.await_rel_timeout", "emqx.zones", [
-  {default, "60s"},
-  {datatype, {duration, ms}}
-]}.
-
-%% @doc Ignore message from self publish
-{mapping, "zone.$name.ignore_loop_deliver", "emqx.zones", [
-  {default, false},
-  {datatype, {enum, [true, false]}}
-]}.
-
-%% @doc Session Expiry Interval
-{mapping, "zone.$name.session_expiry_interval", "emqx.zones", [
-  {default, "2h"},
-  {datatype, {duration, ms}}
-]}.
-
-%% @doc Max queue length. Enqueued messages when persistent client
-%% disconnected, or inflight window is full. 0 means no limit.
-{mapping, "zone.$name.max_mqueue_len", "emqx.zones", [
-  {default, 1000},
-  {datatype, integer}
-]}.
-
-%% @doc Queue Qos0 messages?
-{mapping, "zone.$name.mqueue_store_qos0", "emqx.zones", [
-  {default, true},
-  {datatype, {enum, [true, false]}}
-]}.
-
-{translation, "emqx.zones", fun(Conf) ->
-  Mapping = fun("retain_available", Val) ->
-                    {mqtt_retain_available, Val};
-               ("wildcard_subscription", Val) ->
-                    {mqtt_wildcard_subscription, Val};
-               ("shared_subscription", Val) ->
-                    {mqtt_shared_subscription, Val};
-               (Opt, Val) ->
-                    {list_to_atom(Opt), Val}
-            end,
-  maps:to_list(
-    lists:foldl(
-      fun({["zone", Name, Opt], Val}, Zones) ->
-              maps:update_with(list_to_atom(Name),
-                               fun(Opts) -> [Mapping(Opt, Val)|Opts] end,
-                               [Mapping(Opt, Val)], Zones)
-      end, #{}, lists:usort(cuttlefish_variable:filter_by_prefix("zone.", Conf))))
-end}.
-
 %%--------------------------------------------------------------------
 %% Bridges
 %%--------------------------------------------------------------------

+ 1 - 1
src/emqx.app.src

@@ -3,7 +3,7 @@
               {vsn,"3.0"},
               {modules,[]},
               {registered,[emqx_sup]},
-              {applications,[kernel,stdlib,jsx,gproc,gen_rpc,lager,esockd,minirest]},
+              {applications,[kernel,stdlib,jsx,gproc,gen_rpc,lager,esockd,cowboy]},
               {env,[]},
               {mod,{emqx_app,[]}},
               {maintainers,["Feng Lee <feng@emqx.io>"]},

+ 25 - 24
src/emqx_access_control.erl

@@ -18,15 +18,16 @@
 
 -include("emqx.hrl").
 
-%% API Function Exports
--export([start_link/0, auth/2, check_acl/3, reload_acl/0, lookup_mods/1,
-         register_mod/3, register_mod/4, unregister_mod/2, stop/0]).
-
+-export([start_link/0]).
+-export([authenticate/2]).
+-export([check_acl/3, reload_acl/0, lookup_mods/1]).
 -export([clean_acl_cache/1, clean_acl_cache/2]).
+-export([register_mod/3, register_mod/4, unregister_mod/2]).
+-export([stop/0]).
 
 %% gen_server callbacks
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
-         terminate/2, code_change/3]).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+         code_change/3]).
 
 -define(TAB, ?MODULE).
 -define(SERVER, ?MODULE).
@@ -35,9 +36,9 @@
 
 -record(state, {}).
 
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 %% API
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 
 %% @doc Start access control server.
 -spec(start_link() -> {ok, pid()} | ignore | {error, term()}).
@@ -58,34 +59,34 @@ register_default_mod() ->
     end.
 
 %% @doc Authenticate Client.
--spec(auth(Client :: client(), Password :: password())
+-spec(authenticate(Client :: client(), Password :: password())
       -> ok | {ok, boolean()} | {error, term()}).
-auth(Client, Password) when is_record(Client, client) ->
-    auth(Client, Password, lookup_mods(auth)).
-auth(_Client, _Password, []) ->
-    case emqx_config:get_env(allow_anonymous, false) of
+authenticate(Client, Password) when is_record(Client, client) ->
+    authenticate(Client, Password, lookup_mods(auth)).
+
+authenticate(#client{zone = Zone}, _Password, []) ->
+    case emqx_zone:get_env(Zone, allow_anonymous, false) of
         true  -> ok;
         false -> {error, "No auth module to check!"}
     end;
-auth(Client, Password, [{Mod, State, _Seq} | Mods]) ->
+
+authenticate(Client, Password, [{Mod, State, _Seq} | Mods]) ->
     case catch Mod:check(Client, Password, State) of
         ok              -> ok;
         {ok, IsSuper}   -> {ok, IsSuper};
-        ignore          -> auth(Client, Password, Mods);
+        ignore          -> authenticate(Client, Password, Mods);
         {error, Reason} -> {error, Reason};
         {'EXIT', Error} -> {error, Error}
     end.
 
 %% @doc Check ACL
--spec(check_acl(Client, PubSub, Topic) -> allow | deny when
-      Client :: client(),
-      PubSub :: pubsub(),
-      Topic  :: topic()).
+-spec(check_acl(client(), pubsub(), topic()) -> allow | deny).
 check_acl(Client, PubSub, Topic) when ?PS(PubSub) ->
     check_acl(Client, PubSub, Topic, lookup_mods(acl)).
 
-check_acl(_Client, _PubSub, _Topic, []) ->
-    emqx_config:get_env(acl_nomatch, allow);
+check_acl(#client{zone = Zone}, _PubSub, _Topic, []) ->
+    emqx_zone:get_env(Zone, acl_nomatch, deny);
+
 check_acl(Client, PubSub, Topic, [{Mod, State, _Seq}|AclMods]) ->
     case Mod:check_acl({Client, PubSub, Topic}, State) of
         allow  -> allow;
@@ -175,15 +176,15 @@ handle_call(stop, _From, State) ->
     {stop, normal, ok, State};
 
 handle_call(Req, _From, State) ->
-    emqx_logger:error("[AccessControl] Unexpected request: ~p", [Req]),
+    emqx_logger:error("[AccessControl] unexpected request: ~p", [Req]),
     {reply, ignore, State}.
 
 handle_cast(Msg, State) ->
-    emqx_logger:error("[AccessControl] Unexpected msg: ~p", [Msg]),
+    emqx_logger:error("[AccessControl] unexpected msg: ~p", [Msg]),
     {noreply, State}.
 
 handle_info(Info, State) ->
-    emqx_logger:error("[AccessControl] Unexpected info: ~p", [Info]),
+    emqx_logger:error("[AccessControl] unexpected info: ~p", [Info]),
     {noreply, State}.
 
 terminate(_Reason, _State) ->

+ 1 - 1
src/emqx_bridge1.erl

@@ -19,7 +19,7 @@
 -include("emqx.hrl").
 -include("emqx_mqtt.hrl").
 
- -import(proplists, [get_value/2, get_value/3]).
+-import(proplists, [get_value/2, get_value/3]).
 
 -export([start_link/2, start_bridge/1, stop_bridge/1, status/1]).
 

+ 16 - 16
src/emqx_client.erl

@@ -231,22 +231,22 @@ subscribe(Client, Properties, Topic, Opts)
     subscribe(Client, Properties, [{Topic, Opts}]).
 
 parse_subopt(Opts) ->
-    parse_subopt(Opts, #mqtt_subopts{}).
-
-parse_subopt([], Rec) ->
-    Rec;
-parse_subopt([{rh, I} | Opts], Rec) when I >= 0, I =< 2 ->
-    parse_subopt(Opts, Rec#mqtt_subopts{rh = I});
-parse_subopt([{rap, true} | Opts], Rec) ->
-    parse_subopt(Opts, Rec#mqtt_subopts{rap =1});
-parse_subopt([{rap, false} | Opts], Rec) ->
-    parse_subopt(Opts, Rec#mqtt_subopts{rap = 0});
-parse_subopt([{nl, true} | Opts], Rec) ->
-    parse_subopt(Opts, Rec#mqtt_subopts{nl = 1});
-parse_subopt([{nl, false} | Opts], Rec) ->
-    parse_subopt(Opts, Rec#mqtt_subopts{nl = 0});
-parse_subopt([{qos, QoS} | Opts], Rec) ->
-    parse_subopt(Opts, Rec#mqtt_subopts{qos = ?QOS_I(QoS)}).
+    parse_subopt(Opts, #{rh => 0, rap => 0, nl => 0, qos => ?QOS_0}).
+
+parse_subopt([], Result) ->
+    Result;
+parse_subopt([{rh, I} | Opts], Result) when I >= 0, I =< 2 ->
+    parse_subopt(Opts, Result#{rh := I});
+parse_subopt([{rap, true} | Opts], Result) ->
+    parse_subopt(Opts, Result#{rap := 1});
+parse_subopt([{rap, false} | Opts], Result) ->
+    parse_subopt(Opts, Result#{rap := 0});
+parse_subopt([{nl, true} | Opts], Result) ->
+    parse_subopt(Opts, Result#{nl := 1});
+parse_subopt([{nl, false} | Opts], Result) ->
+    parse_subopt(Opts, Result#{nl := 0});
+parse_subopt([{qos, QoS} | Opts], Result) ->
+    parse_subopt(Opts, Result#{qos := ?QOS_I(QoS)}).
 
 -spec(publish(client(), topic(), payload()) -> ok | {error, term()}).
 publish(Client, Topic, Payload) when is_binary(Topic) ->

+ 103 - 116
src/emqx_connection.erl

@@ -18,40 +18,34 @@
 
 -include("emqx.hrl").
 -include("emqx_mqtt.hrl").
--include("emqx_misc.hrl").
 
 -export([start_link/3]).
 -export([info/1, stats/1, kick/1]).
 -export([session/1]).
--export([clean_acl_cache/1]).
--export([get_rate_limit/1, set_rate_limit/2]).
--export([get_pub_limit/1, set_pub_limit/2]).
 
 %% gen_server callbacks
--export([init/1, handle_call/3, handle_cast/2, handle_info/2, code_change/3,
-         terminate/2]).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+         code_change/3]).
 
 -record(state, {
-          transport,    %% Network transport module
-          socket,       %% TCP or SSL Socket
-          peername,     %% Peername of the socket
-          sockname,     %% Sockname of the socket
-          conn_state,   %% Connection state: running | blocked
-          await_recv,   %% Awaiting recv
-          incoming,     %% Incoming bytes and packets
-          pub_limit,    %% Publish rate limit
-          rate_limit,   %% Traffic rate limit
-          limit_timer,  %% Rate limit timer
-          proto_state,  %% MQTT protocol state
-          parser_state, %% MQTT parser state
-          keepalive,    %% MQTT keepalive timer
-          enable_stats, %% Enable stats
-          stats_timer,  %% Stats timer
-          idle_timeout  %% Connection idle timeout
+          transport,
+          socket,
+          peername,
+          sockname,
+          conn_state,
+          await_recv,
+          proto_state,
+          parser_state,
+          keepalive,
+          enable_stats,
+          stats_timer,
+          incoming,
+          rate_limit,
+          publish_limit,
+          limit_timer,
+          idle_timeout
          }).
 
--define(INFO_KEYS, [peername, sockname, conn_state, await_recv, rate_limit, pub_limit]).
-
 -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]).
 
 -define(LOG(Level, Format, Args, State),
@@ -66,31 +60,19 @@ start_link(Transport, Socket, Options) ->
 %%------------------------------------------------------------------------------
 
 info(CPid) ->
-    gen_server:call(CPid, info).
+    call(CPid, info).
 
 stats(CPid) ->
-    gen_server:call(CPid, stats).
+    call(CPid, stats).
 
 kick(CPid) ->
-    gen_server:call(CPid, kick).
+    call(CPid, kick).
 
 session(CPid) ->
-    gen_server:call(CPid, session, infinity).
-
-clean_acl_cache(CPid) ->
-    gen_server:call(CPid, clean_acl_cache).
-
-get_rate_limit(CPid) ->
-    gen_server:call(CPid, get_rate_limit).
+    call(CPid, session).
 
-set_rate_limit(CPid, Rl = {_Rate, _Burst}) ->
-    gen_server:call(CPid, {set_rate_limit, Rl}).
-
-get_pub_limit(CPid) ->
-    gen_server:call(CPid, get_pub_limit).
-
-set_pub_limit(CPid, Rl = {_Rate, _Burst}) ->
-    gen_server:call(CPid, {set_pub_limit, Rl}).
+call(CPid, Req) ->
+    gen_server:call(CPid, Req, infinity).
 
 %%------------------------------------------------------------------------------
 %% gen_server callbacks
@@ -103,60 +85,77 @@ init([Transport, RawSocket, Options]) ->
             {ok, Peername} = Transport:ensure_ok_or_exit(peername, [Socket]),
             {ok, Sockname} = Transport:ensure_ok_or_exit(sockname, [Socket]),
             Peercert = Transport:ensure_ok_or_exit(peercert, [Socket]),
-            PubLimit = rate_limit(emqx_zone:env(Zone, publish_limit)),
-            RateLimit = rate_limit(proplists:get_value(rate_limit, Options)),
-            EnableStats = emqx_zone:env(Zone, enable_stats, true),
-            IdleTimout = emqx_zone:env(Zone, idle_timeout, 30000),
+            RateLimit = init_limiter(proplists:get_value(rate_limit, Options)),
+            PubLimit = init_limiter(emqx_zone:get_env(Zone, publish_limit)),
+            EnableStats = emqx_zone:get_env(Zone, enable_stats, true),
+            IdleTimout = emqx_zone:get_env(Zone, idle_timeout, 30000),
             SendFun = send_fun(Transport, Socket, Peername),
             ProtoState = emqx_protocol:init(#{peername => Peername,
                                               sockname => Sockname,
                                               peercert => Peercert,
                                               sendfun  => SendFun}, Options),
             ParserState = emqx_protocol:parser(ProtoState),
-            State = run_socket(#state{transport    = Transport,
-                                      socket       = Socket,
-                                      peername     = Peername,
-                                      await_recv   = false,
-                                      conn_state   = running,
-                                      rate_limit   = RateLimit,
-                                      pub_limit    = PubLimit,
-                                      proto_state  = ProtoState,
-                                      parser_state = ParserState,
-                                      enable_stats = EnableStats,
-                                      idle_timeout = IdleTimout}),
+            State = run_socket(#state{transport     = Transport,
+                                       socket        = Socket,
+                                       peername      = Peername,
+                                       await_recv    = false,
+                                       conn_state    = running,
+                                       rate_limit    = RateLimit,
+                                       publish_limit = PubLimit,
+                                       proto_state   = ProtoState,
+                                       parser_state  = ParserState,
+                                       enable_stats  = EnableStats,
+                                       idle_timeout  = IdleTimout}),
             gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}],
                                   State, self(), IdleTimout);
         {error, Reason} ->
             {stop, Reason}
     end.
 
-rate_limit(undefined) ->
+init_limiter(undefined) ->
     undefined;
-rate_limit({Rate, Burst}) ->
+init_limiter({Rate, Burst}) ->
     esockd_rate_limit:new(Rate, Burst).
 
 send_fun(Transport, Socket, Peername) ->
     fun(Data) ->
         try Transport:async_send(Socket, Data) of
-            ok  ->
-                ?LOG(debug, "SEND ~p", [Data], #state{peername = Peername}),
-                emqx_metrics:inc('bytes/sent', iolist_size(Data)), ok;
+            ok ->
+                ?LOG(debug, "SEND ~p", [iolist_to_binary(Data)], #state{peername = Peername}),
+                emqx_metrics:inc('bytes/sent', iolist_size(Data)),
+                ok;
             Error -> Error
         catch
-            error:Error -> {error, Error}
+            error:Error ->
+                {error, Error}
         end
     end.
 
-handle_call(info, From, State = #state{transport = Transport, socket = Socket, proto_state = ProtoState}) ->
+handle_call(info, _From, State = #state{transport     = Transport,
+                                        socket        = Socket,
+                                        peername      = Peername,
+                                        sockname      = Sockname,
+                                        conn_state    = ConnState,
+                                        await_recv    = AwaitRecv,
+                                        rate_limit    = RateLimit,
+                                        publish_limit = PubLimit,
+                                        proto_state   = ProtoState}) ->
+    ConnInfo = [{socktype, Transport:type(Socket)},
+                {peername, Peername},
+                {sockname, Sockname},
+                {conn_state, ConnState},
+                {await_recv, AwaitRecv},
+                {rate_limit, esockd_rate_limit:info(RateLimit)},
+                {publish_limit, esockd_rate_limit:info(PubLimit)}],
     ProtoInfo = emqx_protocol:info(ProtoState),
-    ConnInfo = [{socktype, Transport:type(Socket)} | ?record_to_proplist(state, State, ?INFO_KEYS)],
-    StatsInfo = element(2, handle_call(stats, From, State)),
-    {reply, lists:append([ConnInfo, StatsInfo, ProtoInfo]), State};
+    {reply, lists:usort(lists:append([ConnInfo, ProtoInfo])), State};
 
-handle_call(stats, _From, State = #state{transport = Transport, socket = Sock, proto_state = ProtoState}) ->
+handle_call(stats, _From, State = #state{transport   = Transport,
+                                          socket      = Socket,
+                                          proto_state = ProtoState}) ->
     ProcStats = emqx_misc:proc_stats(),
     ProtoStats = emqx_protocol:stats(ProtoState),
-    SockStats = case Transport:getstat(Sock, ?SOCK_STATS) of
+    SockStats = case Transport:getstat(Socket, ?SOCK_STATS) of
                     {ok, Ss}   -> Ss;
                     {error, _} -> []
                 end,
@@ -168,21 +167,6 @@ handle_call(kick, _From, State) ->
 handle_call(session, _From, State = #state{proto_state = ProtoState}) ->
     {reply, emqx_protocol:session(ProtoState), State};
 
-handle_call(clean_acl_cache, _From, State = #state{proto_state = ProtoState}) ->
-    {reply, ok, State#state{proto_state = emqx_protocol:clean_acl_cache(ProtoState)}};
-
-handle_call(get_rate_limit, _From, State = #state{rate_limit = Rl}) ->
-    {reply, esockd_rate_limit:info(Rl), State};
-
-handle_call({set_rate_limit, {Rate, Burst}}, _From, State) ->
-    {reply, ok, State#state{rate_limit = esockd_rate_limit:new(Rate, Burst)}};
-
-handle_call(get_publish_limit, _From, State = #state{pub_limit = Rl}) ->
-    {reply, esockd_rate_limit:info(Rl), State};
-
-handle_call({set_publish_limit, {Rate, Burst}}, _From, State) ->
-    {reply, ok, State#state{pub_limit = esockd_rate_limit:new(Rate, Burst)}};
-
 handle_call(Req, _From, State) ->
     ?LOG(error, "unexpected call: ~p", [Req], State),
     {reply, ignored, State}.
@@ -203,7 +187,7 @@ handle_info({deliver, PubOrAck}, State = #state{proto_state = ProtoState}) ->
 
 handle_info(emit_stats, State = #state{proto_state = ProtoState}) ->
     Stats = element(2, handle_call(stats, undefined, State)),
-    emqx_cm:set_client_stats(emqx_protocol:clientid(ProtoState), Stats),
+    emqx_cm:set_client_stats(emqx_protocol:client_id(ProtoState), Stats),
     {noreply, State#state{stats_timer = undefined}, hibernate};
 
 handle_info(timeout, State) ->
@@ -220,8 +204,8 @@ handle_info(activate_sock, State) ->
     {noreply, run_socket(State#state{conn_state = running, limit_timer = undefined})};
 
 handle_info({inet_async, _Sock, _Ref, {ok, Data}}, State) ->
-    Size = iolist_size(Data),
     ?LOG(debug, "RECV ~p", [Data], State),
+    Size = iolist_size(Data),
     emqx_metrics:inc('bytes/received', Size),
     Incoming = #{bytes => Size, packets => 0},
     handle_packet(Data, State#state{await_recv = false, incoming = Incoming});
@@ -247,7 +231,6 @@ handle_info({keepalive, start, Interval}, State = #state{transport = Transport,
         {ok, KeepAlive} ->
             {noreply, State#state{keepalive = KeepAlive}};
         {error, Error} ->
-            ?LOG(warning, "Keepalive error - ~p", [Error], State),
             shutdown(Error, State)
     end;
 
@@ -256,10 +239,8 @@ handle_info({keepalive, check}, State = #state{keepalive = KeepAlive}) ->
         {ok, KeepAlive1} ->
             {noreply, State#state{keepalive = KeepAlive1}};
         {error, timeout} ->
-            ?LOG(debug, "Keepalive timeout", [], State),
             shutdown(keepalive_timeout, State);
         {error, Error} ->
-            ?LOG(warning, "Keepalive error - ~p", [Error], State),
             shutdown(Error, State)
     end;
 
@@ -286,28 +267,25 @@ code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
 %%------------------------------------------------------------------------------
-%% Internal functions
+%% Parse and handle packets
 %%------------------------------------------------------------------------------
 
 %% Receive and parse data
 handle_packet(<<>>, State) ->
     {noreply, maybe_gc(ensure_stats_timer(ensure_rate_limit(State)))};
 
-handle_packet(Bytes, State = #state{incoming     = Incoming,
-                                    parser_state = ParserState,
-                                    proto_state  = ProtoState,
-                                    idle_timeout = IdleTimeout}) ->
-    case catch emqx_frame:parse(Bytes, ParserState) of
+handle_packet(Data, State = #state{proto_state  = ProtoState,
+                                   parser_state = ParserState,
+                                   idle_timeout = IdleTimeout}) ->
+    case catch emqx_frame:parse(Data, ParserState) of
         {more, NewParserState} ->
             {noreply, State#state{parser_state = NewParserState}, IdleTimeout};
         {ok, Packet = ?PACKET(Type), Rest} ->
             emqx_metrics:received(Packet),
             case emqx_protocol:received(Packet, ProtoState) of
                 {ok, ProtoState1} ->
-                    ParserState1 = emqx_protocol:parser(ProtoState1),
-                    handle_packet(Rest, State#state{incoming     = count_packets(Type, Incoming),
-                                                    proto_state  = ProtoState1,
-                                                    parser_state = ParserState1});
+                    NewState = State#state{proto_state = ProtoState1},
+                    handle_packet(Rest, inc_publish_cnt(Type, reset_parser(NewState)));
                 {error, Error} ->
                     ?LOG(error, "Protocol error - ~p", [Error], State),
                     shutdown(Error, State);
@@ -320,20 +298,27 @@ handle_packet(Bytes, State = #state{incoming     = Incoming,
             ?LOG(error, "Framing error - ~p", [Error], State),
             shutdown(Error, State);
         {'EXIT', Reason} ->
-            ?LOG(error, "Parse failed for ~p~nError data:~p", [Reason, Bytes], State),
+            ?LOG(error, "Parse failed for ~p~nError data:~p", [Reason, Data], State),
             shutdown(parse_error, State)
     end.
 
-count_packets(?PUBLISH, Incoming = #{packets := Num}) ->
-    Incoming#{packets := Num + 1};
-count_packets(?SUBSCRIBE, Incoming = #{packets := Num}) ->
-    Incoming#{packets := Num + 1};
-count_packets(_Type, Incoming) ->
-    Incoming.
+reset_parser(State = #state{proto_state = ProtoState}) ->
+    State#state{parser_state = emqx_protocol:parser(ProtoState)}.
+
+inc_publish_cnt(Type, State = #state{incoming = Incoming = #{packets := Cnt}})
+    when Type == ?PUBLISH; Type == ?SUBSCRIBE ->
+    State#state{incoming = Incoming#{packets := Cnt + 1}};
+inc_publish_cnt(_Type, State) ->
+    State.
+
+%%------------------------------------------------------------------------------
+%% Ensure rate limit
+%%------------------------------------------------------------------------------
 
-ensure_rate_limit(State = #state{rate_limit = Rl, pub_limit = Pl,
-                                 incoming = #{bytes := Bytes, packets := Pkts}}) ->
-    ensure_rate_limit([{Pl, #state.pub_limit, Pkts}, {Rl, #state.rate_limit, Bytes}], State).
+ensure_rate_limit(State = #state{rate_limit = Rl, publish_limit = Pl,
+                                 incoming = #{packets := Packets, bytes := Bytes}}) ->
+    ensure_rate_limit([{Pl, #state.publish_limit, Packets},
+                       {Rl, #state.rate_limit, Bytes}], State).
 
 ensure_rate_limit([], State) ->
     run_socket(State);
@@ -356,12 +341,15 @@ run_socket(State = #state{transport = Transport, socket = Sock}) ->
     Transport:async_recv(Sock, 0, infinity),
     State#state{await_recv = true}.
 
+%%------------------------------------------------------------------------------
+%% Ensure stats timer
+%%------------------------------------------------------------------------------
+
 ensure_stats_timer(State = #state{enable_stats = true,
-                                  stats_timer  = undefined,
-                                  idle_timeout = IdleTimeout}) ->
+                                   stats_timer  = undefined,
+                                   idle_timeout = IdleTimeout}) ->
     State#state{stats_timer = erlang:send_after(IdleTimeout, self(), emit_stats)};
-ensure_stats_timer(State) ->
-    State.
+ensure_stats_timer(State) -> State.
 
 shutdown(Reason, State) ->
     stop({shutdown, Reason}, State).
@@ -370,7 +358,6 @@ stop(Reason, State) ->
     {stop, Reason, State}.
 
 maybe_gc(State) ->
-    State. %% TODO:...
-    %%Cb = fun() -> Transport:gc(Sock), end,
-    %%emqx_gc:maybe_force_gc(#state.force_gc_count, State, Cb).
+    %% TODO: gc and shutdown policy
+    State.
 

+ 5 - 4
src/emqx_frame.erl

@@ -31,7 +31,8 @@
 
 -export_type([options/0, parse_state/0]).
 
--define(DEFAULT_OPTIONS, #{max_packet_size => ?MAX_PACKET_SIZE, version => ?MQTT_PROTO_V4}).
+-define(DEFAULT_OPTIONS, #{max_packet_size => ?MAX_PACKET_SIZE,
+                           version         => ?MQTT_PROTO_V4}).
 
 %%------------------------------------------------------------------------------
 %% Init parse state
@@ -330,7 +331,7 @@ parse_variable_byte_integer(<<0:1, Len:7, Rest/binary>>, Multiplier, Value) ->
     {Value + Len * Multiplier, Rest}.
 
 parse_topic_filters(subscribe, Bin) ->
-    [{Topic, #mqtt_subopts{rh = Rh, rap = Rap, nl = Nl, qos = QoS}}
+    [{Topic, #{rh => Rh, rap => Rap, nl => Nl, qos => QoS}}
      || <<Len:16/big, Topic:Len/binary, _:2, Rh:2, Rap:1, Nl:1, QoS:2>> <= Bin];
 
 parse_topic_filters(unsubscribe, Bin) ->
@@ -576,12 +577,12 @@ serialize_property('Shared-Subscription-Available', Val) ->
 serialize_topic_filters(subscribe, TopicFilters, ?MQTT_PROTO_V5) ->
     << <<(serialize_utf8_string(Topic))/binary,
          ?RESERVED:2, Rh:2, (flag(Rap)):1,(flag(Nl)):1, QoS:2 >>
-       || {Topic, #mqtt_subopts{rh = Rh, rap = Rap, nl = Nl, qos = QoS}}
+       || {Topic, #{rh := Rh, rap := Rap, nl := Nl, qos := QoS}}
           <- TopicFilters >>;
 
 serialize_topic_filters(subscribe, TopicFilters, _Ver) ->
     << <<(serialize_utf8_string(Topic))/binary, ?RESERVED:6, QoS:2>>
-       || {Topic, #mqtt_subopts{qos = QoS}} <- TopicFilters >>;
+       || {Topic, #{qos := QoS}} <- TopicFilters >>;
 
 serialize_topic_filters(unsubscribe, TopicFilters, _Ver) ->
     << <<(serialize_utf8_string(Topic))/binary>> || Topic <- TopicFilters >>.

+ 138 - 0
src/emqx_mqtt_caps.erl

@@ -0,0 +1,138 @@
+%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+
+%% @doc MQTTv5 capabilities
+-module(emqx_mqtt_caps).
+
+-include("emqx.hrl").
+-include("emqx_mqtt.hrl").
+
+-export([check_pub/2, check_sub/2]).
+-export([get_caps/1, get_caps/2]).
+
+-type(caps() :: #{max_packet_size  => integer(),
+                  max_clientid_len => integer(),
+                  max_topic_alias  => integer(),
+                  max_topic_levels => integer(),
+                  max_qos_allowed  => mqtt_qos(),
+                  mqtt_retain_available      => boolean(),
+                  mqtt_shared_subscription   => boolean(),
+                  mqtt_wildcard_subscription => boolean()}).
+
+-export_type([caps/0]).
+
+-define(UNLIMITED, 0).
+-define(DEFAULT_CAPS, [{max_packet_size,  ?MAX_PACKET_SIZE},
+                       {max_clientid_len, ?MAX_CLIENTID_LEN},
+                       {max_topic_alias,  ?UNLIMITED},
+                       {max_topic_levels, ?UNLIMITED},
+                       {max_qos_allowed,  ?QOS_2},
+                       {mqtt_retain_available,      true},
+                       {mqtt_shared_subscription,   true},
+                       {mqtt_wildcard_subscription, true}]).
+
+-define(PUBCAP_KEYS, [max_qos_allowed,
+                      mqtt_retain_available]).
+-define(SUBCAP_KEYS, [max_qos_allowed,
+                      max_topic_levels,
+                      mqtt_shared_subscription,
+                      mqtt_wildcard_subscription]).
+
+-spec(check_pub(zone(), map()) -> ok | {error, mqtt_reason_code()}).
+check_pub(Zone, Props) when is_map(Props) ->
+    do_check_pub(Props, maps:to_list(get_caps(Zone, publish))).
+
+do_check_pub(_Props, []) ->
+    ok;
+do_check_pub(Props = #{qos := QoS}, [{max_qos_allowed, MaxQoS}|Caps]) ->
+    case QoS > MaxQoS of
+        true  -> {error, ?RC_QOS_NOT_SUPPORTED};
+        false -> do_check_pub(Props, Caps)
+    end;
+do_check_pub(#{retain := true}, [{mqtt_retain_available, false}|_Caps]) ->
+    {error, ?RC_RETAIN_NOT_SUPPORTED};
+do_check_pub(Props, [{mqtt_retain_available, true}|Caps]) ->
+    do_check_pub(Props, Caps).
+
+-spec(check_sub(zone(), mqtt_topic_filters()) -> {ok | error, mqtt_topic_filters()}).
+check_sub(Zone, TopicFilters) ->
+    Caps = maps:to_list(get_caps(Zone, subscribe)),
+    lists:foldr(fun({Topic, Opts}, {Ok, Result}) ->
+                    case check_sub(Topic, Opts, Caps) of
+                        {ok, Opts1} ->
+                            {Ok, [{Topic, Opts1}|Result]};
+                        {error, Opts1} ->
+                            {error, [{Topic, Opts1}|Result]}
+                    end
+                end, {ok, []}, TopicFilters).
+
+check_sub(_Topic, Opts, []) ->
+    {ok, Opts};
+check_sub(Topic, Opts = #{qos := QoS}, [{max_qos_allowed, MaxQoS}|Caps]) ->
+    check_sub(Topic, Opts#{qos := min(QoS, MaxQoS)}, Caps);
+check_sub(Topic, Opts, [{mqtt_shared_subscription, true}|Caps]) ->
+    check_sub(Topic, Opts, Caps);
+check_sub(Topic, Opts, [{mqtt_shared_subscription, false}|Caps]) ->
+    case maps:is_key(share, Opts) of
+        true  ->
+            {error, Opts#{rc := ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}};
+        false -> check_sub(Topic, Opts, Caps)
+    end;
+check_sub(Topic, Opts, [{mqtt_wildcard_subscription, true}|Caps]) ->
+    check_sub(Topic, Opts, Caps);
+check_sub(Topic, Opts, [{mqtt_wildcard_subscription, false}|Caps]) ->
+    case emqx_topic:wildcard(Topic) of
+        true  ->
+            {error, Opts#{rc := ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED}};
+        false -> check_sub(Topic, Opts, Caps)
+    end;
+check_sub(Topic, Opts, [{max_topic_levels, ?UNLIMITED}|Caps]) ->
+    check_sub(Topic, Opts, Caps);
+check_sub(Topic, Opts, [{max_topic_levels, Limit}|Caps]) ->
+    case emqx_topic:levels(Topic) of
+        Levels when Levels > Limit ->
+            {error, Opts#{rc := ?RC_TOPIC_FILTER_INVALID}};
+        _ -> check_sub(Topic, Opts, Caps)
+    end.
+
+get_caps(Zone, publish) ->
+    with_env(Zone, '$mqtt_pub_caps',
+             fun() ->
+                 filter_caps(?PUBCAP_KEYS, get_caps(Zone))
+             end);
+
+get_caps(Zone, subscribe) ->
+    with_env(Zone, '$mqtt_sub_caps',
+             fun() ->
+                 filter_caps(?SUBCAP_KEYS, get_caps(Zone))
+             end).
+
+get_caps(Zone) ->
+    with_env(Zone, '$mqtt_caps',
+             fun() ->
+                 maps:from_list([{Cap, emqx_zone:get_env(Zone, Cap, Def)}
+                                 || {Cap, Def} <- ?DEFAULT_CAPS])
+             end).
+
+filter_caps(Keys, Caps) ->
+    maps:filter(fun(Key, _Val) -> lists:member(Key, Keys) end, Caps).
+
+with_env(Zone, Key, InitFun) ->
+    case emqx_zone:get_env(Zone, Key) of
+        undefined -> Caps = InitFun(),
+                     ok = emqx_zone:set_env(Zone, Key, Caps),
+                     Caps;
+        ZoneCaps  -> ZoneCaps
+    end.
+

+ 47 - 5
src/emqx_packet.erl

@@ -17,21 +17,59 @@
 -include("emqx.hrl").
 -include("emqx_mqtt.hrl").
 
--export([protocol_name/1, type_name/1]).
+-export([protocol_name/1]).
+-export([type_name/1]).
+-export([validate/1]).
 -export([format/1]).
 -export([to_message/2, from_message/2]).
 
 %% @doc Protocol name of version
 -spec(protocol_name(mqtt_version()) -> binary()).
-protocol_name(?MQTT_PROTO_V3) -> <<"MQIsdp">>;
-protocol_name(?MQTT_PROTO_V4) -> <<"MQTT">>;
-protocol_name(?MQTT_PROTO_V5) -> <<"MQTT">>.
+protocol_name(?MQTT_PROTO_V3) ->
+    <<"MQIsdp">>;
+protocol_name(?MQTT_PROTO_V4) ->
+    <<"MQTT">>;
+protocol_name(?MQTT_PROTO_V5) ->
+    <<"MQTT">>.
 
 %% @doc Name of MQTT packet type
 -spec(type_name(mqtt_packet_type()) -> atom()).
 type_name(Type) when Type > ?RESERVED andalso Type =< ?AUTH ->
     lists:nth(Type, ?TYPE_NAMES).
 
+validate(?SUBSCRIBE_PACKET(_PacketId, _Properties, [])) ->
+    error(packet_empty_topic_filters);
+validate(?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters)) ->
+    validate_packet_id(PacketId)
+        andalso validate_properties(?SUBSCRIBE, Properties)
+            andalso ok == lists:foreach(fun validate_subscription/1, TopicFilters);
+
+validate(?UNSUBSCRIBE_PACKET(_PacketId, [])) ->
+    error(packet_empty_topic_filters);
+validate(?UNSUBSCRIBE_PACKET(PacketId, TopicFilters)) ->
+    validate_packet_id(PacketId)
+        andalso ok == lists:foreach(fun emqx_topic:validate/1, TopicFilters);
+
+validate(_Packet) ->
+    true.
+
+validate_packet_id(0) ->
+    error(bad_packet_id);
+validate_packet_id(_) ->
+    true.
+
+validate_properties(?SUBSCRIBE, #{'Subscription-Identifier' := 0}) ->
+    error(bad_subscription_identifier);
+validate_properties(?SUBSCRIBE, _) ->
+    true.
+
+validate_subscription({Topic, #{qos := QoS}}) ->
+    emqx_topic:validate(filter, Topic) andalso validate_qos(QoS).
+
+validate_qos(QoS) when ?QOS0 =< QoS, QoS =< ?QOS2 ->
+    true;
+validate_qos(_) -> error(bad_qos).
+
 %% @doc From Message to Packet
 -spec(from_message(mqtt_packet_id(), message()) -> mqtt_packet()).
 from_message(PacketId, Msg = #message{qos = QoS, topic = Topic, payload = Payload}) ->
@@ -56,7 +94,11 @@ to_message(ClientId, #mqtt_packet{header   = #mqtt_packet_header{type   = ?PUBLI
                                                                   properties = Props},
                                   payload  = Payload}) ->
     Msg = emqx_message:make(ClientId, QoS, Topic, Payload),
-    Msg#message{flags = #{dup => Dup, retain => Retain}, headers = Props};
+    Msg#message{flags = #{dup => Dup, retain => Retain},
+                headers = if
+                              Props =:= undefined -> #{};
+                              true -> Props
+                          end};
 
 to_message(_ClientId, #mqtt_packet_connect{will_flag = false}) ->
     undefined;

+ 5 - 1
src/emqx_pmon.erl

@@ -15,7 +15,11 @@
 -module(emqx_pmon).
 
 -export([new/0]).
--export([monitor/2, monitor/3, demonitor/2, find/2, erase/2]).
+-export([monitor/2, monitor/3]).
+-export([demonitor/2]).
+-export([find/2]).
+-export([erase/2]).
+
 -compile({no_auto_import,[monitor/3]}).
 
 -type(pmon() :: {?MODULE, map()}).

Failā izmaiņas netiks attēlotas, jo tās ir par lielu
+ 553 - 477
src/emqx_protocol.erl


+ 213 - 148
src/emqx_session.erl

@@ -11,7 +11,7 @@
 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 %% See the License for the specific language governing permissions and
 %% limitations under the License.
-%%
+
 %% @doc
 %% A stateful interaction between a Client and a Server. Some Sessions
 %% last only as long as the Network Connection, others can span multiple
@@ -35,28 +35,31 @@
 %% If the Session is currently not connected, the time at which the Session
 %% will end and Session State will be discarded.
 %% @end
+
 -module(emqx_session).
 
 -behaviour(gen_server).
 
 -include("emqx.hrl").
 -include("emqx_mqtt.hrl").
--include("emqx_misc.hrl").
 
--export([start_link/1, close/1]).
+-export([start_link/1]).
 -export([info/1, stats/1]).
 -export([resume/2, discard/2]).
--export([subscribe/2]).%%, subscribe/3]).
+-export([subscribe/2, subscribe/4]).
 -export([publish/3]).
 -export([puback/2, puback/3]).
 -export([pubrec/2, pubrec/3]).
--export([pubrel/2, pubcomp/2]).
--export([unsubscribe/2]).
+-export([pubrel/3, pubcomp/3]).
+-export([unsubscribe/2, unsubscribe/4]).
+-export([close/1]).
 
 %% gen_server callbacks
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
          code_change/3]).
 
+-import(emqx_zone, [get_env/2, get_env/3]).
+
 -record(state, {
           %% Clean Start Flag
           clean_start = false :: boolean(),
@@ -76,9 +79,6 @@
           %% Old client Pid that has been kickout
           old_client_pid :: pid(),
 
-          %% Pending sub/unsub requests
-          requests :: map(),
-
           %% Next packet id of the session
           next_pkt_id = 1 :: mqtt_packet_id(),
 
@@ -130,27 +130,28 @@
           %% Enable Stats
           enable_stats :: boolean(),
 
-          %% Force GC reductions
-          reductions = 0 :: non_neg_integer(),
+          %% Stats timer
+          stats_timer  :: reference() | undefined,
 
           %% Ignore loop deliver?
           ignore_loop_deliver = false :: boolean(),
 
+          %% TODO:
+          deliver_stats = 0,
+
+          %% TODO:
+          enqueue_stats = 0,
+
           %% Created at
           created_at :: erlang:timestamp()
         }).
 
 -define(TIMEOUT, 60000).
 
--define(DEFAULT_SUBOPTS, #{rh => 0, rap => 0, nl => 0, qos => ?QOS_0}).
-
--define(INFO_KEYS, [clean_start, client_id, username, client_pid, binding, created_at]).
-
--define(STATE_KEYS, [clean_start, client_id, username, binding, client_pid, old_client_pid,
-                     next_pkt_id, max_subscriptions, subscriptions, upgrade_qos, inflight,
-                     max_inflight, retry_interval, mqueue, awaiting_rel, max_awaiting_rel,
-                     await_rel_timeout, expiry_interval, enable_stats, force_gc_count,
-                     created_at]).
+-define(INFO_KEYS, [clean_start, client_id, username, binding, client_pid, old_client_pid,
+                    next_pkt_id, max_subscriptions, subscriptions, upgrade_qos, inflight,
+                    max_inflight, retry_interval, mqueue, awaiting_rel, max_awaiting_rel,
+                    await_rel_timeout, expiry_interval, enable_stats, created_at]).
 
 -define(LOG(Level, Format, Args, State),
         emqx_logger:Level([{client, State#state.client_id}],
@@ -159,7 +160,8 @@
 %% @doc Start a session
 -spec(start_link(SessAttrs :: map()) -> {ok, pid()} | {error, term()}).
 start_link(SessAttrs) ->
-    gen_server:start_link(?MODULE, SessAttrs, [{hibernate_after, 30000}]).
+    IdleTimeout = maps:get(idle_timeout, SessAttrs, 30000),
+    gen_server:start_link(?MODULE, SessAttrs, [{hibernate_after, IdleTimeout}]).
 
 %%------------------------------------------------------------------------------
 %% PubSub API
@@ -167,12 +169,17 @@ start_link(SessAttrs) ->
 
 -spec(subscribe(pid(), list({topic(), map()}) |
                 {mqtt_packet_id(), mqtt_properties(), topic_table()}) -> ok).
-%% internal call
 subscribe(SPid, TopicFilters) when is_list(TopicFilters) ->
-    %%TODO: Parse the topic filters?
-    subscribe(SPid, {undefined, #{}, TopicFilters});
+    gen_server:cast(SPid, {subscribe, [begin
+                                           {Topic, Opts} = emqx_topic:parse(RawTopic),
+                                           {Topic, maps:merge(
+                                                     maps:merge(
+                                                       ?DEFAULT_SUBOPTS, SubOpts), Opts)}
+                                       end || {RawTopic, SubOpts} <- TopicFilters]}).
+
 %% for mqtt 5.0
-subscribe(SPid, SubReq = {PacketId, Props, TopicFilters}) ->
+subscribe(SPid, PacketId, Properties, TopicFilters) ->
+    SubReq = {PacketId, Properties, TopicFilters},
     gen_server:cast(SPid, {subscribe, self(), SubReq}).
 
 -spec(publish(pid(), mqtt_packet_id(), message()) -> {ok, delivery()} | {error, term()}).
@@ -190,31 +197,34 @@ publish(SPid, PacketId, Msg = #message{qos = ?QOS_2}) ->
 
 -spec(puback(pid(), mqtt_packet_id()) -> ok).
 puback(SPid, PacketId) ->
-    gen_server:cast(SPid, {puback, PacketId}).
+    gen_server:cast(SPid, {puback, PacketId, ?RC_SUCCESS}).
 
-puback(SPid, PacketId, {ReasonCode, Props}) ->
-    gen_server:cast(SPid, {puback, PacketId, {ReasonCode, Props}}).
+puback(SPid, PacketId, ReasonCode) ->
+    gen_server:cast(SPid, {puback, PacketId, ReasonCode}).
 
 -spec(pubrec(pid(), mqtt_packet_id()) -> ok).
 pubrec(SPid, PacketId) ->
     gen_server:cast(SPid, {pubrec, PacketId}).
 
-pubrec(SPid, PacketId, {ReasonCode, Props}) ->
-    gen_server:cast(SPid, {pubrec, PacketId, {ReasonCode, Props}}).
+pubrec(SPid, PacketId, ReasonCode) ->
+    gen_server:cast(SPid, {pubrec, PacketId, ReasonCode}).
 
--spec(pubrel(pid(), mqtt_packet_id()) -> ok).
-pubrel(SPid, PacketId) ->
-    gen_server:cast(SPid, {pubrel, PacketId}).
+-spec(pubrel(pid(), mqtt_packet_id(), mqtt_reason_code()) -> ok).
+pubrel(SPid, PacketId, ReasonCode) ->
+    gen_server:cast(SPid, {pubrel, PacketId, ReasonCode}).
 
--spec(pubcomp(pid(), mqtt_packet_id()) -> ok).
-pubcomp(SPid, PacketId) ->
-    gen_server:cast(SPid, {pubcomp, PacketId}).
+-spec(pubcomp(pid(), mqtt_packet_id(), mqtt_reason_code()) -> ok).
+pubcomp(SPid, PacketId, ReasonCode) ->
+    gen_server:cast(SPid, {pubcomp, PacketId, ReasonCode}).
 
 -spec(unsubscribe(pid(), {mqtt_packet_id(), mqtt_properties(), topic_table()}) -> ok).
 unsubscribe(SPid, TopicFilters) when is_list(TopicFilters) ->
     %%TODO: Parse the topic filters?
-    unsubscribe(SPid, {undefined, #{}, TopicFilters});
-unsubscribe(SPid, UnsubReq = {PacketId, Properties, TopicFilters}) ->
+    unsubscribe(SPid, {undefined, #{}, TopicFilters}).
+
+%% TODO:...
+unsubscribe(SPid, PacketId, Properties, TopicFilters) ->
+    UnsubReq = {PacketId, Properties, TopicFilters},
     gen_server:cast(SPid, {unsubscribe, self(), UnsubReq}).
 
 -spec(resume(pid(), pid()) -> ok).
@@ -226,20 +236,52 @@ resume(SPid, ClientPid) ->
 info(SPid) when is_pid(SPid) ->
     gen_server:call(SPid, info);
 
-info(State) when is_record(State, state) ->
-    ?record_to_proplist(state, State, ?INFO_KEYS).
+info(#state{clean_start = CleanStart,
+            binding = Binding,
+            client_id = ClientId,
+            username = Username,
+            max_subscriptions = MaxSubscriptions,
+            subscriptions = Subscriptions,
+            upgrade_qos = UpgradeQoS,
+            inflight = Inflight,
+            max_inflight = MaxInflight,
+            retry_interval = RetryInterval,
+            mqueue = MQueue,
+            awaiting_rel = AwaitingRel,
+            max_awaiting_rel = MaxAwaitingRel,
+            await_rel_timeout = AwaitRelTimeout,
+            expiry_interval = ExpiryInterval,
+            created_at = CreatedAt}) ->
+    [{clean_start, CleanStart},
+     {binding, Binding},
+     {client_id, ClientId},
+     {username, Username},
+     {max_subscriptions, MaxSubscriptions},
+     {subscriptions, maps:size(Subscriptions)},
+     {upgrade_qos, UpgradeQoS},
+     {inflight, emqx_inflight:size(Inflight)},
+     {max_inflight, MaxInflight},
+     {retry_interval, RetryInterval},
+     {mqueue_len, emqx_mqueue:len(MQueue)},
+     {awaiting_rel, maps:size(AwaitingRel)},
+     {max_awaiting_rel, MaxAwaitingRel},
+     {await_rel_timeout, AwaitRelTimeout},
+     {expiry_interval, ExpiryInterval},
+     {created_at, CreatedAt}].
 
 -spec(stats(pid() | #state{}) -> list({atom(), non_neg_integer()})).
 stats(SPid) when is_pid(SPid) ->
-    gen_server:call(SPid, stats);
+    gen_server:call(SPid, stats, infinity);
 
 stats(#state{max_subscriptions = MaxSubscriptions,
-             subscriptions     = Subscriptions,
-             inflight          = Inflight,
-             max_inflight      = MaxInflight,
-             mqueue            = MQueue,
-             max_awaiting_rel  = MaxAwaitingRel,
-             awaiting_rel      = AwaitingRel}) ->
+              subscriptions     = Subscriptions,
+              inflight          = Inflight,
+              max_inflight      = MaxInflight,
+              mqueue            = MQueue,
+              max_awaiting_rel  = MaxAwaitingRel,
+              awaiting_rel      = AwaitingRel,
+              deliver_stats     = DeliverMsg,
+              enqueue_stats     = EnqueueMsg}) ->
     lists:append(emqx_misc:proc_stats(),
                  [{max_subscriptions, MaxSubscriptions},
                   {subscriptions,     maps:size(Subscriptions)},
@@ -250,8 +292,8 @@ stats(#state{max_subscriptions = MaxSubscriptions,
                   {mqueue_dropped,    emqx_mqueue:dropped(MQueue)},
                   {max_awaiting_rel,  MaxAwaitingRel},
                   {awaiting_rel_len,  maps:size(AwaitingRel)},
-                  {deliver_msg,       get(deliver_msg)},
-                  {enqueue_msg,       get(enqueue_msg)}]).
+                  {deliver_msg,       DeliverMsg},
+                  {enqueue_msg,       EnqueueMsg}]).
 
 %% @doc Discard the session
 -spec(discard(pid(), client_id()) -> ok).
@@ -270,41 +312,41 @@ init(#{zone        := Zone,
        client_id   := ClientId,
        client_pid  := ClientPid,
        clean_start := CleanStart,
-       username    := Username}) ->
+       username    := Username,
+       %% TODO:
+       conn_props  := _ConnProps}) ->
     process_flag(trap_exit, true),
     true = link(ClientPid),
-    init_stats([deliver_msg, enqueue_msg]),
-    MaxInflight = emqx_zone:env(Zone, max_inflight),
+    MaxInflight = get_env(Zone, max_inflight),
     State = #state{clean_start       = CleanStart,
                    binding           = binding(ClientPid),
                    client_id         = ClientId,
                    client_pid        = ClientPid,
                    username          = Username,
                    subscriptions     = #{},
-                   max_subscriptions = emqx_zone:env(Zone, max_subscriptions, 0),
-                   upgrade_qos       = emqx_zone:env(Zone, upgrade_qos, false),
+                   max_subscriptions = get_env(Zone, max_subscriptions, 0),
+                   upgrade_qos       = get_env(Zone, upgrade_qos, false),
                    max_inflight      = MaxInflight,
                    inflight          = emqx_inflight:new(MaxInflight),
                    mqueue            = init_mqueue(Zone, ClientId),
-                   retry_interval    = emqx_zone:env(Zone, retry_interval, 0),
+                   retry_interval    = get_env(Zone, retry_interval, 0),
                    awaiting_rel      = #{},
-                   await_rel_timeout = emqx_zone:env(Zone, await_rel_timeout),
-                   max_awaiting_rel  = emqx_zone:env(Zone, max_awaiting_rel),
-                   expiry_interval   = emqx_zone:env(Zone, session_expiry_interval),
-                   enable_stats      = emqx_zone:env(Zone, enable_stats, true),
-                   ignore_loop_deliver = emqx_zone:env(Zone, ignore_loop_deliver, true),
+                   await_rel_timeout = get_env(Zone, await_rel_timeout),
+                   max_awaiting_rel  = get_env(Zone, max_awaiting_rel),
+                   expiry_interval   = get_env(Zone, session_expiry_interval),
+                   enable_stats      = get_env(Zone, enable_stats, true),
+                   ignore_loop_deliver = get_env(Zone, ignore_loop_deliver, false),
+                   deliver_stats      = 0,
+                   enqueue_stats      = 0,
                    created_at        = os:timestamp()},
     emqx_sm:register_session(ClientId, info(State)),
     emqx_hooks:run('session.created', [ClientId]),
-    {ok, emit_stats(State), hibernate}.
+    {ok, ensure_stats_timer(State), hibernate}.
 
 init_mqueue(Zone, ClientId) ->
     emqx_mqueue:new(ClientId, #{type => simple,
-                                max_len => emqx_zone:env(Zone, max_mqueue_len),
-                                store_qos0 => emqx_zone:env(Zone, mqueue_store_qos0)}).
-
-init_stats(Keys) ->
-    lists:foreach(fun(K) -> put(K, 0) end, Keys).
+                                max_len => get_env(Zone, max_mqueue_len),
+                                store_qos0 => get_env(Zone, mqueue_store_qos0)}).
 
 binding(ClientPid) ->
     case node(ClientPid) =:= node() of true -> local; false -> remote end.
@@ -347,11 +389,27 @@ handle_call(Req, _From, State) ->
     emqx_logger:error("[Session] unexpected call: ~p", [Req]),
     {reply, ignored, State}.
 
-handle_cast({subscribe, From, {PacketId, _Properties, TopicFilters}},
+%% SUBSCRIBE:
+handle_cast({subscribe, TopicFilters}, State = #state{client_id = ClientId, subscriptions = Subscriptions}) ->
+    Subscriptions1 = lists:foldl(
+                       fun({Topic, SubOpts}, SubMap) ->
+                               case maps:find(Topic, SubMap) of
+                                   {ok, _OldOpts} ->
+                                       emqx_broker:set_subopts(Topic, {self(), ClientId}, SubOpts),
+                                       emqx_hooks:run('session.subscribed', [ClientId, Topic, SubOpts]),
+                                       ?LOG(warning, "Duplicated subscribe: ~s, subopts: ~p", [Topic, SubOpts], State);
+                                   error ->
+                                       emqx_broker:subscribe(Topic, ClientId, SubOpts),
+                                       emqx_hooks:run('session.subscribed', [ClientId, Topic, SubOpts])
+                               end,
+                               maps:put(Topic, SubOpts, SubMap)
+                       end, Subscriptions, TopicFilters),
+    {noreply, State#state{subscriptions = Subscriptions1}};
+
+handle_cast({subscribe, From, {PacketId, Properties, TopicFilters}},
             State = #state{client_id = ClientId, subscriptions = Subscriptions}) ->
-    ?LOG(info, "Subscribe ~p", [TopicFilters], State),
     {ReasonCodes, Subscriptions1} =
-    lists:foldl(fun({Topic, SubOpts = #{qos := QoS}}, {RcAcc, SubMap}) ->
+    lists:foldr(fun({Topic, SubOpts = #{qos := QoS}}, {RcAcc, SubMap}) ->
                     {[QoS|RcAcc],
                      case maps:find(Topic, SubMap) of
                          {ok, SubOpts} ->
@@ -361,68 +419,64 @@ handle_cast({subscribe, From, {PacketId, _Properties, TopicFilters}},
                              emqx_broker:set_subopts(Topic, {self(), ClientId}, SubOpts),
                              emqx_hooks:run('session.subscribed', [ClientId, Topic, SubOpts]),
                              ?LOG(warning, "Duplicated subscribe ~s, old_opts: ~p, new_opts: ~p", [Topic, OldOpts, SubOpts], State),
-                             maps:put(Topic, SubOpts, SubMap);
+                             maps:put(Topic, with_subid(Properties, SubOpts), SubMap);
                          error ->
                              emqx_broker:subscribe(Topic, ClientId, SubOpts),
                              emqx_hooks:run('session.subscribed', [ClientId, Topic, SubOpts]),
-                             maps:put(Topic, SubOpts, SubMap)
+                             maps:put(Topic, with_subid(Properties, SubOpts), SubMap)
                      end}
                 end, {[], Subscriptions}, TopicFilters),
-    suback(From, PacketId, lists:reverse(ReasonCodes)),
-    {noreply, emit_stats(State#state{subscriptions = Subscriptions1})};
+    suback(From, PacketId, ReasonCodes),
+    {noreply, State#state{subscriptions = Subscriptions1}};
 
+%% UNSUBSCRIBE:
 handle_cast({unsubscribe, From, {PacketId, _Properties, TopicFilters}},
             State = #state{client_id = ClientId, subscriptions = Subscriptions}) ->
-    ?LOG(info, "Unsubscribe ~p", [TopicFilters], State),
     {ReasonCodes, Subscriptions1} =
-    lists:foldl(fun(Topic, {RcAcc, SubMap}) ->
-                case maps:find(Topic, SubMap) of
-                    {ok, SubOpts} ->
-                        emqx_broker:unsubscribe(Topic, ClientId),
-                        emqx_hooks:run('session.unsubscribed', [ClientId, Topic, SubOpts]),
-                        {[?RC_SUCCESS|RcAcc], maps:remove(Topic, SubMap)};
-                    error ->
-                        {[?RC_NO_SUBSCRIPTION_EXISTED|RcAcc], SubMap}
-                end
-        end, {[], Subscriptions}, TopicFilters),
-    unsuback(From, PacketId, lists:reverse(ReasonCodes)),
-    {noreply, emit_stats(State#state{subscriptions = Subscriptions1})};
+    lists:foldr(fun(Topic, {RcAcc, SubMap}) ->
+                        case maps:find(Topic, SubMap) of
+                            {ok, SubOpts} ->
+                                emqx_broker:unsubscribe(Topic, ClientId),
+                                emqx_hooks:run('session.unsubscribed', [ClientId, Topic, SubOpts]),
+                                {[?RC_SUCCESS|RcAcc], maps:remove(Topic, SubMap)};
+                            error ->
+                                {[?RC_NO_SUBSCRIPTION_EXISTED|RcAcc], SubMap}
+                        end
+                end, {[], Subscriptions}, TopicFilters),
+    unsuback(From, PacketId, ReasonCodes),
+    {noreply, State#state{subscriptions = Subscriptions1}};
 
 %% PUBACK:
-handle_cast({puback, PacketId}, State = #state{inflight = Inflight}) ->
-    {noreply,
-     case emqx_inflight:contain(PacketId, Inflight) of
-         true ->
-             dequeue(acked(puback, PacketId, State));
-         false ->
-             ?LOG(warning, "PUBACK ~p missed inflight: ~p",
-                  [PacketId, emqx_inflight:window(Inflight)], State),
-             emqx_metrics:inc('packets/puback/missed'),
-             State
-     end, hibernate};
+handle_cast({puback, PacketId, _ReasonCode}, State = #state{inflight = Inflight}) ->
+    case emqx_inflight:contain(PacketId, Inflight) of
+        true ->
+            {noreply, dequeue(acked(puback, PacketId, State))};
+        false ->
+            ?LOG(warning, "The PUBACK PacketId is not found: ~p", [PacketId], State),
+            emqx_metrics:inc('packets/puback/missed'),
+            {noreply, State}
+    end;
 
-%% PUBREC:
-handle_cast({pubrec, PacketId}, State = #state{inflight = Inflight}) ->
-    {noreply,
-     case emqx_inflight:contain(PacketId, Inflight) of
-         true ->
-             acked(pubrec, PacketId, State);
-         false ->
-             ?LOG(warning, "PUBREC ~p missed inflight: ~p",
-                  [PacketId, emqx_inflight:window(Inflight)], State),
-             emqx_metrics:inc('packets/pubrec/missed'),
-             State
-     end, hibernate};
+%% PUBREC: How to handle ReasonCode?
+handle_cast({pubrec, PacketId, _ReasonCode}, State = #state{inflight = Inflight}) ->
+    case emqx_inflight:contain(PacketId, Inflight) of
+        true ->
+            {noreply, acked(pubrec, PacketId, State)};
+        false ->
+            ?LOG(warning, "The PUBREC PacketId is not found: ~w", [PacketId], State),
+            emqx_metrics:inc('packets/pubrec/missed'),
+            {noreply, State}
+    end;
 
 %% PUBREL:
-handle_cast({pubrel, PacketId}, State = #state{awaiting_rel = AwaitingRel}) ->
+handle_cast({pubrel, PacketId, _ReasonCode}, State = #state{awaiting_rel = AwaitingRel}) ->
     {noreply,
      case maps:take(PacketId, AwaitingRel) of
          {Msg, AwaitingRel1} ->
              %% Implement Qos2 by method A [MQTT 4.33]
              %% Dispatch to subscriber when received PUBREL
              emqx_broker:publish(Msg), %% FIXME:
-             gc(State#state{awaiting_rel = AwaitingRel1});
+             maybe_gc(State#state{awaiting_rel = AwaitingRel1});
          error ->
              ?LOG(warning, "Cannot find PUBREL: ~p", [PacketId], State),
              emqx_metrics:inc('packets/pubrel/missed'),
@@ -430,17 +484,15 @@ handle_cast({pubrel, PacketId}, State = #state{awaiting_rel = AwaitingRel}) ->
      end, hibernate};
 
 %% PUBCOMP:
-handle_cast({pubcomp, PacketId}, State = #state{inflight = Inflight}) ->
-    {noreply,
-     case emqx_inflight:contain(PacketId, Inflight) of
-         true ->
-             dequeue(acked(pubcomp, PacketId, State));
-         false ->
-             ?LOG(warning, "The PUBCOMP ~p is not inflight: ~p",
-                  [PacketId, emqx_inflight:window(Inflight)], State),
-             emqx_metrics:inc('packets/pubcomp/missed'),
-             State
-     end, hibernate};
+handle_cast({pubcomp, PacketId, _ReasonCode}, State = #state{inflight = Inflight}) ->
+    case emqx_inflight:contain(PacketId, Inflight) of
+        true ->
+            {noreply, dequeue(acked(pubcomp, PacketId, State))};
+        false ->
+            ?LOG(warning, "The PUBCOMP Packet Identifier is not found: ~w", [PacketId], State),
+            emqx_metrics:inc('packets/pubcomp/missed'),
+            {noreply, State}
+    end;
 
 %% RESUME:
 handle_cast({resume, ClientPid},
@@ -451,7 +503,7 @@ handle_cast({resume, ClientPid},
                            await_rel_timer = AwaitTimer,
                            expiry_timer    = ExpireTimer}) ->
 
-    ?LOG(info, "Resumed by ~p", [ClientPid], State),
+    ?LOG(info, "Resumed by ~p ", [ClientPid], State),
 
     %% Cancel Timers
     lists:foreach(fun emqx_misc:cancel_timer/1,
@@ -484,7 +536,7 @@ handle_cast({resume, ClientPid},
     end,
 
     %% Replay delivery and Dequeue pending messages
-    {noreply, emit_stats(dequeue(retry_delivery(true, State1)))};
+    {noreply, ensure_stats_timer(dequeue(retry_delivery(true, State1)))};
 
 handle_cast(Msg, State) ->
     emqx_logger:error("[Session] unexpected cast: ~p", [Msg]),
@@ -502,17 +554,17 @@ handle_info({dispatch, _Topic, #message{from = ClientId}},
 
 %% Dispatch Message
 handle_info({dispatch, Topic, Msg}, State) when is_record(Msg, message) ->
-    {noreply, gc(dispatch(tune_qos(Topic, reset_dup(Msg), State), State))};
+    {noreply, maybe_gc(dispatch(tune_qos(Topic, reset_dup(Msg), State), State))};
 
 %% Do nothing if the client has been disconnected.
 handle_info({timeout, _Timer, retry_delivery}, State = #state{client_pid = undefined}) ->
-    {noreply, emit_stats(State#state{retry_timer = undefined})};
+    {noreply, ensure_stats_timer(State#state{retry_timer = undefined})};
 
 handle_info({timeout, _Timer, retry_delivery}, State) ->
-    {noreply, emit_stats(retry_delivery(false, State#state{retry_timer = undefined}))};
+    {noreply, ensure_stats_timer(retry_delivery(false, State#state{retry_timer = undefined}))};
 
 handle_info({timeout, _Timer, check_awaiting_rel}, State) ->
-    {noreply, expire_awaiting_rel(emit_stats(State#state{await_rel_timer = undefined}))};
+    {noreply, ensure_stats_timer(expire_awaiting_rel(State#state{await_rel_timer = undefined}))};
 
 handle_info({timeout, _Timer, expired}, State) ->
     ?LOG(info, "Expired, shutdown now.", [], State),
@@ -529,7 +581,7 @@ handle_info({'EXIT', ClientPid, Reason},
     ?LOG(info, "Client ~p EXIT for ~p", [ClientPid, Reason], State),
     ExpireTimer = emqx_misc:start_timer(Interval, expired),
     State1 = State#state{client_pid = undefined, expiry_timer = ExpireTimer},
-    {noreply, emit_stats(State1), hibernate};
+    {noreply, State1, hibernate};
 
 handle_info({'EXIT', Pid, _Reason}, State = #state{old_client_pid = Pid}) ->
     %% ignore
@@ -540,6 +592,10 @@ handle_info({'EXIT', Pid, Reason}, State = #state{client_pid = ClientPid}) ->
          [ClientPid, Pid, Reason], State),
     {noreply, State, hibernate};
 
+handle_info(emit_stats, State = #state{client_id = ClientId}) ->
+    emqx_sm:set_session_stats(ClientId, stats(State)),
+    {noreply, State#state{stats_timer = undefined}, hibernate};
+
 handle_info(Info, State) ->
     emqx_logger:error("[Session] unexpected info: ~p", [Info]),
     {noreply, State}.
@@ -555,6 +611,10 @@ code_change(_OldVsn, State, _Extra) ->
 %% Internal functions
 %%------------------------------------------------------------------------------
 
+with_subid(#{'Subscription-Identifier' := SubId}, Opts) ->
+    maps:put(subid, SubId, Opts);
+with_subid(_Props, Opts) -> Opts.
+
 suback(_From, undefined, _ReasonCodes) ->
     ignore;
 suback(From, PacketId, ReasonCodes) ->
@@ -589,6 +649,7 @@ retry_delivery(Force, State = #state{inflight = Inflight}) ->
             State;
         false ->
             Msgs = lists:sort(sortfun(inflight), emqx_inflight:values(Inflight)),
+            io:format("!!! Retry Delivery: ~p~n", [Msgs]),
             retry_delivery(Force, Msgs, os:timestamp(), State)
     end.
 
@@ -675,36 +736,39 @@ dispatch(Msg, State = #state{client_id = ClientId, client_pid = undefined}) ->
 
 %% Deliver qos0 message directly to client
 dispatch(Msg = #message{qos = ?QOS0}, State) ->
-    deliver(undefined, Msg, State), State;
+    deliver(undefined, Msg, State),
+    inc_stats(deliver, State);
 
 dispatch(Msg = #message{qos = QoS}, State = #state{next_pkt_id = PacketId, inflight = Inflight})
     when QoS =:= ?QOS1 orelse QoS =:= ?QOS2 ->
     case emqx_inflight:is_full(Inflight) of
-        true  ->
+        true ->
             enqueue_msg(Msg, State);
         false ->
             deliver(PacketId, Msg, State),
-            await(PacketId, Msg, next_pkt_id(State))
+            %% TODO inc_stats??
+            await(PacketId, Msg, next_pkt_id(inc_stats(deliver, State)))
     end.
 
 enqueue_msg(Msg, State = #state{mqueue = Q}) ->
-    inc_stats(enqueue_msg),
-    State#state{mqueue = emqx_mqueue:in(Msg, Q)}.
+    inc_stats(enqueue, State#state{mqueue = emqx_mqueue:in(Msg, Q)}).
 
 %%------------------------------------------------------------------------------
 %% Deliver
 %%------------------------------------------------------------------------------
 
 redeliver({PacketId, Msg = #message{qos = QoS}}, State) ->
-    deliver(PacketId, if QoS =:= ?QOS2 -> Msg; true -> emqx_message:set_flag(dup, Msg) end, State);
+    deliver(PacketId, if QoS =:= ?QOS2 -> Msg;
+                         true -> emqx_message:set_flag(dup, Msg)
+                      end, State);
 
 redeliver({pubrel, PacketId}, #state{client_pid = Pid}) ->
     Pid ! {deliver, {pubrel, PacketId}}.
 
 deliver(PacketId, Msg, #state{client_pid = Pid, binding = local}) ->
-    inc_stats(deliver_msg), Pid ! {deliver, {publish, PacketId, Msg}};
+    Pid ! {deliver, {publish, PacketId, Msg}};
 deliver(PacketId, Msg, #state{client_pid = Pid, binding = remote}) ->
-    inc_stats(deliver_msg), emqx_rpc:cast(node(Pid), erlang, send, [Pid, {deliver, PacketId, Msg}]).
+    emqx_rpc:cast(node(Pid), erlang, send, [Pid, {deliver, PacketId, Msg}]).
 
 %%------------------------------------------------------------------------------
 %% Awaiting ACK for QoS1/QoS2 Messages
@@ -802,27 +866,28 @@ next_pkt_id(State = #state{next_pkt_id = 16#FFFF}) ->
 next_pkt_id(State = #state{next_pkt_id = Id}) ->
     State#state{next_pkt_id = Id + 1}.
 
-%%--------------------------------------------------------------------
-%% Emit session stats
+%%------------------------------------------------------------------------------
+%% Ensure stats timer
 
-emit_stats(State = #state{enable_stats = false}) ->
-    State;
-emit_stats(State = #state{client_id = ClientId}) ->
-    emqx_sm:set_session_stats(ClientId, stats(State)),
+ensure_stats_timer(State = #state{enable_stats = true,
+                                  stats_timer  = undefined}) ->
+    State#state{stats_timer = erlang:send_after(30000, self(), emit_stats)};
+ensure_stats_timer(State) ->
     State.
 
-inc_stats(Key) -> put(Key, get(Key) + 1).
+inc_stats(deliver, State = #state{deliver_stats = I}) ->
+    State#state{deliver_stats = I + 1};
+inc_stats(enqueue, State = #state{enqueue_stats = I}) ->
+    State#state{enqueue_stats = I + 1}.
 
 %%--------------------------------------------------------------------
 %% Helper functions
 
 reply(Reply, State) ->
-    {reply, Reply, State, hibernate}.
+    {reply, Reply, State}.
 
 shutdown(Reason, State) ->
     {stop, {shutdown, Reason}, State}.
 
-gc(State) ->
-    State.
-    %%emqx_gc:maybe_force_gc(#state.force_gc_count, State).
+maybe_gc(State) -> State.
 

+ 1 - 1
src/emqx_shared_sub.erl

@@ -97,7 +97,7 @@ pick(SubPids) ->
     lists:nth((X rem length(SubPids)) + 1, SubPids).
 
 subscribers(Group, Topic) ->
-    ets:select(?TAB, [{{shared_subscription, Group, Topic, '$1'}, [], ['$1']}]).
+    ets:select(?TAB, [{{emqx_shared_subscription, Group, Topic, '$1'}, [], ['$1']}]).
 
 %%-----------------------------------------------------------------------------
 %% gen_server callbacks

+ 38 - 35
src/emqx_sm.erl

@@ -20,8 +20,10 @@
 
 -export([start_link/0]).
 
--export([open_session/1, lookup_session/1, close_session/1, lookup_session_pid/1]).
--export([resume_session/1, resume_session/2, discard_session/1, discard_session/2]).
+-export([open_session/1, close_session/1]).
+-export([lookup_session/1, lookup_session_pid/1]).
+-export([resume_session/1, resume_session/2]).
+-export([discard_session/1, discard_session/2]).
 -export([register_session/2, get_session_attrs/1, unregister_session/1]).
 -export([get_session_stats/1, set_session_stats/2]).
 
@@ -29,7 +31,8 @@
 -export([dispatch/3]).
 
 %% gen_server callbacks
--export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+         code_change/3]).
 
 -record(state, {session_pmon}).
 
@@ -46,7 +49,7 @@ start_link() ->
     gen_server:start_link({local, ?SM}, ?MODULE, [], []).
 
 %% @doc Open a session.
--spec(open_session(map()) -> {ok, pid(), boolean()} | {error, term()}).
+-spec(open_session(map()) -> {ok, pid()} | {ok, pid(), boolean()} | {error, term()}).
 open_session(Attrs = #{clean_start := true,
                        client_id   := ClientId,
                        client_pid  := ClientPid}) ->
@@ -61,8 +64,8 @@ open_session(Attrs = #{clean_start := false,
                        client_pid  := ClientPid}) ->
     ResumeStart = fun(_) ->
                       case resume_session(ClientId, ClientPid) of
-                          {ok, SessionPid} ->
-                              {ok, SessionPid};
+                          {ok, SPid} ->
+                              {ok, SPid, true};
                           {error, not_found} ->
                               emqx_session_sup:start_session(Attrs);
                           {error, Reason} ->
@@ -78,10 +81,10 @@ discard_session(ClientId) when is_binary(ClientId) ->
 
 discard_session(ClientId, ClientPid) when is_binary(ClientId) ->
     lists:foreach(
-      fun({_ClientId, SessionPid}) ->
-          case catch emqx_session:discard(SessionPid, ClientPid) of
+      fun({_ClientId, SPid}) ->
+          case catch emqx_session:discard(SPid, ClientPid) of
               {Err, Reason} when Err =:= 'EXIT'; Err =:= error ->
-                  emqx_logger:error("[SM] Failed to discard ~p: ~p", [SessionPid, Reason]);
+                  emqx_logger:error("[SM] Failed to discard ~p: ~p", [SPid, Reason]);
               ok -> ok
           end
       end, lookup_session(ClientId)).
@@ -94,25 +97,25 @@ resume_session(ClientId) ->
 resume_session(ClientId, ClientPid) ->
     case lookup_session(ClientId) of
         [] -> {error, not_found};
-        [{_ClientId, SessionPid}] ->
-            ok = emqx_session:resume(SessionPid, ClientPid),
-            {ok, SessionPid};
+        [{_ClientId, SPid}] ->
+            ok = emqx_session:resume(SPid, ClientPid),
+            {ok, SPid};
         Sessions ->
-            [{_, SessionPid}|StaleSessions] = lists:reverse(Sessions),
+            [{_, SPid}|StaleSessions] = lists:reverse(Sessions),
             emqx_logger:error("[SM] More than one session found: ~p", [Sessions]),
             lists:foreach(fun({_, StalePid}) ->
                               catch emqx_session:discard(StalePid, ClientPid)
                           end, StaleSessions),
-            ok = emqx_session:resume(SessionPid, ClientPid),
-            {ok, SessionPid}
+            ok = emqx_session:resume(SPid, ClientPid),
+            {ok, SPid}
     end.
 
 %% @doc Close a session.
 -spec(close_session({client_id(), pid()} | pid()) -> ok).
-close_session({_ClientId, SessionPid}) ->
-    emqx_session:close(SessionPid);
-close_session(SessionPid) when is_pid(SessionPid) ->
-    emqx_session:close(SessionPid).
+close_session({_ClientId, SPid}) ->
+    emqx_session:close(SPid);
+close_session(SPid) when is_pid(SPid) ->
+    emqx_session:close(SPid).
 
 %% @doc Register a session with attributes.
 -spec(register_session(client_id() | {client_id(), pid()},
@@ -120,8 +123,8 @@ close_session(SessionPid) when is_pid(SessionPid) ->
 register_session(ClientId, Attrs) when is_binary(ClientId) ->
     register_session({ClientId, self()}, Attrs);
 
-register_session(Session = {ClientId, SessionPid}, Attrs)
-    when is_binary(ClientId), is_pid(SessionPid) ->
+register_session(Session = {ClientId, SPid}, Attrs)
+    when is_binary(ClientId), is_pid(SPid) ->
     ets:insert(?SESSION, Session),
     ets:insert(?SESSION_ATTRS, {Session, Attrs}),
     case proplists:get_value(clean_start, Attrs, true) of
@@ -129,13 +132,13 @@ register_session(Session = {ClientId, SessionPid}, Attrs)
         false  -> ets:insert(?SESSION_P, Session)
     end,
     emqx_sm_registry:register_session(Session),
-    notify({registered, ClientId, SessionPid}).
+    notify({registered, ClientId, SPid}).
 
 %% @doc Get session attrs
 -spec(get_session_attrs({client_id(), pid()})
       -> list(emqx_session:attribute())).
-get_session_attrs(Session = {ClientId, SessionPid})
-    when is_binary(ClientId), is_pid(SessionPid) ->
+get_session_attrs(Session = {ClientId, SPid})
+    when is_binary(ClientId), is_pid(SPid) ->
     safe_lookup_element(?SESSION_ATTRS, Session, []).
 
 %% @doc Unregister a session
@@ -143,19 +146,19 @@ get_session_attrs(Session = {ClientId, SessionPid})
 unregister_session(ClientId) when is_binary(ClientId) ->
     unregister_session({ClientId, self()});
 
-unregister_session(Session = {ClientId, SessionPid})
-    when is_binary(ClientId), is_pid(SessionPid) ->
+unregister_session(Session = {ClientId, SPid})
+    when is_binary(ClientId), is_pid(SPid) ->
     emqx_sm_registry:unregister_session(Session),
     ets:delete(?SESSION_STATS, Session),
     ets:delete(?SESSION_ATTRS, Session),
     ets:delete_object(?SESSION_P, Session),
     ets:delete_object(?SESSION, Session),
-    notify({unregistered, ClientId, SessionPid}).
+    notify({unregistered, ClientId, SPid}).
 
 %% @doc Get session stats
 -spec(get_session_stats({client_id(), pid()}) -> list(emqx_stats:stats())).
-get_session_stats(Session = {ClientId, SessionPid})
-    when is_binary(ClientId), is_pid(SessionPid) ->
+get_session_stats(Session = {ClientId, SPid})
+    when is_binary(ClientId), is_pid(SPid) ->
     safe_lookup_element(?SESSION_STATS, Session, []).
 
 %% @doc Set session stats
@@ -164,8 +167,8 @@ get_session_stats(Session = {ClientId, SessionPid})
 set_session_stats(ClientId, Stats) when is_binary(ClientId) ->
     set_session_stats({ClientId, self()}, Stats);
 
-set_session_stats(Session = {ClientId, SessionPid}, Stats)
-    when is_binary(ClientId), is_pid(SessionPid) ->
+set_session_stats(Session = {ClientId, SPid}, Stats)
+    when is_binary(ClientId), is_pid(SPid) ->
     ets:insert(?SESSION_STATS, {Session, Stats}).
 
 %% @doc Lookup a session from registry
@@ -217,11 +220,11 @@ handle_call(Req, _From, State) ->
     emqx_logger:error("[SM] unexpected call: ~p", [Req]),
     {reply, ignored, State}.
 
-handle_cast({notify, {registered, ClientId, SessionPid}}, State = #state{session_pmon = PMon}) ->
-    {noreply, State#state{session_pmon = emqx_pmon:monitor(SessionPid, ClientId, PMon)}};
+handle_cast({notify, {registered, ClientId, SPid}}, State = #state{session_pmon = PMon}) ->
+    {noreply, State#state{session_pmon = emqx_pmon:monitor(SPid, ClientId, PMon)}};
 
-handle_cast({notify, {unregistered, _ClientId, SessionPid}}, State = #state{session_pmon = PMon}) ->
-    {noreply, State#state{session_pmon = emqx_pmon:demonitor(SessionPid, PMon)}};
+handle_cast({notify, {unregistered, _ClientId, SPid}}, State = #state{session_pmon = PMon}) ->
+    {noreply, State#state{session_pmon = emqx_pmon:demonitor(SPid, PMon)}};
 
 handle_cast(Msg, State) ->
     emqx_logger:error("[SM] unexpected cast: ~p", [Msg]),

+ 39 - 27
src/emqx_topic.erl

@@ -17,10 +17,15 @@
 -include("emqx.hrl").
 -include("emqx_mqtt.hrl").
 
--import(lists, [reverse/1]).
-
--export([match/2, validate/1, triples/1, words/1, wildcard/1]).
--export([join/1, feed_var/3, systop/1]).
+-export([match/2]).
+-export([validate/1, validate/2]).
+-export([levels/1]).
+-export([triples/1]).
+-export([words/1]).
+-export([wildcard/1]).
+-export([join/1]).
+-export([feed_var/3]).
+-export([systop/1]).
 -export([parse/1, parse/2]).
 
 -type(word() :: '' | '+' | '#' | binary()).
@@ -69,15 +74,21 @@ match([_H1|_], []) ->
 match([], [_H|_T2]) ->
     false.
 
-%% @doc Validate Topic
--spec(validate({name | filter, topic()}) -> boolean()).
-validate({_, <<>>}) ->
-    false;
-validate({_, Topic}) when is_binary(Topic) and (size(Topic) > ?MAX_TOPIC_LEN) ->
-    false;
-validate({filter, Topic}) when is_binary(Topic) ->
+%% @doc Validate topic name or filter
+-spec(validate(topic() | {name | filter, topic()}) -> true).
+validate(Topic) when is_binary(Topic) ->
+    validate(filter, Topic);
+validate({Type, Topic}) when Type =:= name; Type =:= filter ->
+    validate(Type, Topic).
+
+-spec(validate(name | filter, topic()) -> true).
+validate(_, <<>>) ->
+    error(empty_topic);
+validate(_, Topic) when is_binary(Topic) and (size(Topic) > ?MAX_TOPIC_LEN) ->
+    error(topic_too_long);
+validate(filter, Topic) when is_binary(Topic) ->
     validate2(words(Topic));
-validate({name, Topic}) when is_binary(Topic) ->
+validate(name, Topic) when is_binary(Topic) ->
     Words = words(Topic),
     validate2(Words) and (not wildcard(Words)).
 
@@ -86,7 +97,7 @@ validate2([]) ->
 validate2(['#']) -> % end with '#'
     true;
 validate2(['#'|Words]) when length(Words) > 0 ->
-    false;
+    error('topic_invalid_#');
 validate2([''|Words]) ->
     validate2(Words);
 validate2(['+'|Words]) ->
@@ -97,7 +108,7 @@ validate2([W|Words]) ->
 validate3(<<>>) ->
     true;
 validate3(<<C/utf8, _Rest/binary>>) when C == $#; C == $+; C == 0 ->
-    false;
+    error('topic_invalid_char');
 validate3(<<_/utf8, Rest/binary>>) ->
     validate3(Rest).
 
@@ -107,7 +118,7 @@ triples(Topic) when is_binary(Topic) ->
     triples(words(Topic), root, []).
 
 triples([], _Parent, Acc) ->
-    reverse(Acc);
+    lists:reverse(Acc);
 triples([W|Words], Parent, Acc) ->
     Node = join(Parent, W),
     triples(Words, Node, [{Parent, W, Node}|Acc]).
@@ -122,6 +133,9 @@ bin('+') -> <<"+">>;
 bin('#') -> <<"#">>;
 bin(B) when is_binary(B) -> B.
 
+levels(Topic) when is_binary(Topic) ->
+    length(words(Topic)).
+
 %% @doc Split Topic Path to Words
 -spec(words(topic()) -> words()).
 words(Topic) when is_binary(Topic) ->
@@ -142,7 +156,7 @@ systop(Name) when is_binary(Name) ->
 feed_var(Var, Val, Topic) ->
     feed_var(Var, Val, words(Topic), []).
 feed_var(_Var, _Val, [], Acc) ->
-    join(reverse(Acc));
+    join(lists:reverse(Acc));
 feed_var(Var, Val, [Var|Words], Acc) ->
     feed_var(Var, Val, Words, [Val|Acc]);
 feed_var(Var, Val, [W|Words], Acc) ->
@@ -166,17 +180,15 @@ join(Words) ->
 parse(Topic) when is_binary(Topic) ->
     parse(Topic, #{}).
 
-parse(Topic = <<"$queue/", Topic1/binary>>, Options) ->
-    case maps:find(share, Options) of
-        {ok, _} -> error({invalid_topic, Topic});
-        error   -> parse(Topic1, maps:put(share, '$queue', Options))
-    end;
-parse(Topic = <<"$share/", Topic1/binary>>, Options) ->
-    case maps:find(share, Options) of
-        {ok, _} -> error({invalid_topic, Topic});
-        error   -> [Group, Topic2] = binary:split(Topic1, <<"/">>),
-                   {Topic2, maps:put(share, Group, Options)}
-    end;
+parse(Topic = <<"$queue/", _/binary>>, #{share := _Group}) ->
+    error({invalid_topic, Topic});
+parse(Topic = <<"$share/", _/binary>>, #{share := _Group}) ->
+    error({invalid_topic, Topic});
+parse(<<"$queue/", Topic1/binary>>, Options) ->
+    parse(Topic1, maps:put(share, '$queue', Options));
+parse(<<"$share/", Topic1/binary>>, Options) ->
+    [Group, Topic2] = binary:split(Topic1, <<"/">>),
+    {Topic2, maps:put(share, Group, Options)};
 parse(Topic, Options) ->
     {Topic, Options}.
 

+ 2 - 2
src/emqx_trie.erl

@@ -118,8 +118,8 @@ add_path({Node, Word, Child}) ->
 
 %% @private
 %% @doc Match node with word or '+'.
-match_node(root, [<<"$SYS">>|Words]) ->
-    match_node(<<"$SYS">>, Words, []);
+match_node(root, [NodeId = <<$$, _/binary>>|Words]) ->
+    match_node(NodeId, Words, []);
 
 match_node(NodeId, Words) ->
     match_node(NodeId, Words, []).

+ 3 - 5
src/emqx_ws_connection.erl

@@ -16,7 +16,6 @@
 
 -include("emqx.hrl").
 -include("emqx_mqtt.hrl").
--include("emqx_misc.hrl").
 
 -export([info/1]).
 -export([stats/1]).
@@ -44,9 +43,8 @@
           shutdown_reason
          }).
 
--define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]).
-
 -define(INFO_KEYS, [peername, sockname]).
+-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]).
 
 -define(WSLOG(Level, Format, Args, State),
         lager:Level("WsClient(~s): " ++ Format, [esockd_net:format(State#state.peername) | Args])).
@@ -110,8 +108,8 @@ websocket_init(#state{request = Req, options = Options}) ->
                                       sendfun  => send_fun(self())}, Options),
     ParserState = emqx_protocol:parser(ProtoState),
     Zone = proplists:get_value(zone, Options),
-    EnableStats = emqx_zone:env(Zone, enable_stats, true),
-    IdleTimout = emqx_zone:env(Zone, idle_timeout, 30000),
+    EnableStats = emqx_zone:get_env(Zone, enable_stats, true),
+    IdleTimout = emqx_zone:get_env(Zone, idle_timeout, 30000),
     lists:foreach(fun(Stat) -> put(Stat, 0) end, ?SOCK_STATS),
     {ok, #state{peername     = Peername,
                 sockname     = Sockname,

+ 26 - 14
src/emqx_zone.erl

@@ -16,9 +16,11 @@
 
 -behaviour(gen_server).
 
--export([start_link/0]).
+-include("emqx.hrl").
 
--export([env/2, env/3]).
+-export([start_link/0]).
+-export([get_env/2, get_env/3]).
+-export([set_env/3]).
 
 %% gen_server callbacks
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
@@ -31,19 +33,25 @@
 start_link() ->
     gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
 
-env(undefined, Par) ->
-    emqx_config:get_env(Par);
-env(Zone, Par) ->
-    env(Zone, Par, undefined).
-
-env(undefined, Par, Default) ->
-    emqx_config:get_env(Par, Default);
-env(Zone, Par, Default) ->
-    try ets:lookup_element(?TAB, {Zone, Par}, 2)
+-spec(get_env(zone() | undefined, atom()) -> undefined | term()).
+get_env(undefined, Key) ->
+    emqx_config:get_env(Key);
+get_env(Zone, Key) ->
+    get_env(Zone, Key, undefined).
+
+-spec(get_env(zone() | undefined, atom(), term()) -> undefined | term()).
+get_env(undefined, Key, Def) ->
+    emqx_config:get_env(Key, Def);
+get_env(Zone, Key, Def) ->
+    try ets:lookup_element(?TAB, {Zone, Key}, 2)
     catch error:badarg ->
-        emqx_config:get_env(Par, Default)
+        emqx_config:get_env(Key, Def)
     end.
 
+-spec(set_env(zone(), atom(), term()) -> ok).
+set_env(Zone, Key, Val) ->
+    gen_server:cast(?MODULE, {set_env, Zone, Key, Val}).
+
 %%------------------------------------------------------------------------------
 %% gen_server callbacks
 %%------------------------------------------------------------------------------
@@ -56,6 +64,10 @@ handle_call(Req, _From, State) ->
     emqx_logger:error("[Zone] unexpected call: ~p", [Req]),
     {reply, ignored, State}.
 
+handle_cast({set_env, Zone, Key, Val}, State) ->
+    true = ets:insert(?TAB, {{Zone, Key}, Val}),
+    {noreply, State};
+
 handle_cast(Msg, State) ->
     emqx_logger:error("[Zone] unexpected cast: ~p", [Msg]),
     {noreply, State}.
@@ -63,7 +75,7 @@ handle_cast(Msg, State) ->
 handle_info(reload, State) ->
     lists:foreach(
       fun({Zone, Opts}) ->
-          [ets:insert(?TAB, {{Zone, Par}, Val}) || {Par, Val} <- Opts]
+          [ets:insert(?TAB, {{Zone, Key}, Val}) || {Key, Val} <- Opts]
       end, emqx_config:get_env(zones, [])),
     {noreply, ensure_reload_timer(State), hibernate};
 
@@ -82,5 +94,5 @@ code_change(_OldVsn, State, _Extra) ->
 %%------------------------------------------------------------------------------
 
 ensure_reload_timer(State) ->
-    State#state{timer = erlang:send_after(5000, self(), reload)}.
+    State#state{timer = erlang:send_after(10000, self(), reload)}.
 

+ 8 - 8
include/emqx_misc.hrl

@@ -12,15 +12,15 @@
 %% See the License for the specific language governing permissions and
 %% limitations under the License.
 
--define(record_to_map(Def, Rec),
-        maps:from_list(?record_to_proplist(Def, Rec))).
+-module(emqx_mqtt_caps_SUITE).
 
--define(record_to_map(Def, Rec, Fields),
-        maps:from_list(?record_to_proplist(Def, Rec, Fields))).
+-include_lib("eunit/include/eunit.hrl").
 
--define(record_to_proplist(Def, Rec),
-        lists:zip(record_info(fields, Def), tl(tuple_to_list(Rec)))).
+%% CT
+-compile(export_all).
+-compile(nowarn_export_all).
+
+all() ->
+    [].
 
--define(record_to_proplist(Def, Rec, Fields),
-        [{K, V} || {K, V} <- ?record_to_proplist(Def, Rec), lists:member(K, Fields)]).
 

+ 21 - 7
test/emqx_topic_SUITE.erl

@@ -1,5 +1,4 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
+%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%
 %% Licensed under the Apache License, Version 2.0 (the "License");
 %% you may not use this file except in compliance with the License.
@@ -12,7 +11,6 @@
 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 %% See the License for the specific language governing permissions and
 %% limitations under the License.
-%%--------------------------------------------------------------------
 
 -module(emqx_topic_SUITE).
 
@@ -27,10 +25,23 @@
 
 -define(N, 10000).
 
-all() -> [t_wildcard, t_match, t_match2, t_match3, t_validate, t_triples, t_join,
-          t_words, t_systop, t_feed_var, t_sys_match, 't_#_match',
-          t_sigle_level_validate, t_sigle_level_match, t_match_perf,
-          t_triples_perf, t_parse].
+all() ->
+    [t_wildcard,
+     t_match, t_match2, t_match3,
+     t_validate,
+     t_triples,
+     t_join,
+     t_levels,
+     t_words,
+     t_systop,
+     t_feed_var,
+     t_sys_match,
+     't_#_match',
+     t_sigle_level_validate,
+     t_sigle_level_match,
+     t_match_perf,
+     t_triples_perf,
+     t_parse].
 
 t_wildcard(_) ->
     true  = wildcard(<<"a/b/#">>),
@@ -149,6 +160,9 @@ t_triples_perf(_) ->
         end),
     io:format("Time for triples: ~p(micro)", [Time/?N]).
 
+t_levels(_) ->
+    ?assertEqual(4, emqx_topic:levels(<<"a/b/c/d">>)).
+
 t_words(_) ->
     ['', <<"a">>, '+', '#'] = words(<<"/a/+/#">>),
     ['', <<"abkc">>, <<"19383">>, '+', <<"akakdkkdkak">>, '#'] = words(<<"/abkc/19383/+/akakdkkdkak/#">>),