Просмотр исходного кода

Merge pull request #1038 from emqtt/emq22

Merge emq22 to master
turtleDeng 9 лет назад
Родитель
Сommit
f730d011c4

+ 5 - 5
Makefile

@@ -1,18 +1,18 @@
 PROJECT = emqttd
 PROJECT_DESCRIPTION = Erlang MQTT Broker
-PROJECT_VERSION = 2.1.2
-
-DEPS = goldrush gproc lager esockd mochiweb pbkdf2 lager_syslog
+PROJECT_VERSION = 2.2
 
+DEPS = goldrush gproc lager esockd mochiweb pbkdf2 lager_syslog bcrypt
 
 dep_goldrush     = git https://github.com/basho/goldrush 0.1.9
 dep_gproc        = git https://github.com/uwiger/gproc
 dep_getopt       = git https://github.com/jcomellas/getopt v0.8.2
 dep_lager        = git https://github.com/basho/lager master
-dep_esockd       = git https://github.com/emqtt/esockd v4.2
-dep_mochiweb     = git https://github.com/emqtt/mochiweb
+dep_esockd       = git https://github.com/emqtt/esockd emq22
+dep_mochiweb     = git https://github.com/emqtt/mochiweb emq22
 dep_pbkdf2       = git https://github.com/emqtt/pbkdf2 2.0.1
 dep_lager_syslog = git https://github.com/basho/lager_syslog
+dep_bcrypt       = git https://github.com/smarkets/erlang-bcrypt master
 
 ERLC_OPTS += +'{parse_transform, lager_transform}'
 

+ 0 - 3
etc/acl.conf

@@ -24,6 +24,3 @@
 
 {deny, all, subscribe, ["$SYS/#", {eq, "#"}]}.
 
-{allow, all}.
-
-

Разница между файлами не показана из-за своего большого размера
+ 222 - 53
etc/emq.conf


+ 8 - 7
include/emqttd.hrl

@@ -84,6 +84,7 @@
           keepalive = 0,
           will_topic    :: undefined | binary(),
           ws_initial_headers :: list({ws_header_key(), ws_header_val()}),
+          mountpoint    :: undefined | binary(),
           connected_at  :: erlang:timestamp()
         }).
 
@@ -157,8 +158,8 @@
 %%--------------------------------------------------------------------
 
 -record(mqtt_route,
-        { topic   :: binary(),
-          node    :: node()
+        { topic :: binary(),
+          node  :: node()
         }).
 
 -type(mqtt_route() :: #mqtt_route{}).
@@ -168,11 +169,11 @@
 %%--------------------------------------------------------------------
 
 -record(mqtt_alarm,
-        { id          :: binary(),
-          severity    :: warning | error | critical,
-          title       :: iolist() | binary(),
-          summary     :: iolist() | binary(),
-          timestamp   :: erlang:timestamp()
+        { id        :: binary(),
+          severity  :: warning | error | critical,
+          title     :: iolist() | binary(),
+          summary   :: iolist() | binary(),
+          timestamp :: erlang:timestamp()
         }).
 
 -type(mqtt_alarm() :: #mqtt_alarm{}).

+ 382 - 90
priv/emq.schema

@@ -1,13 +1,37 @@
 %%-*- mode: erlang -*-
 %% EMQ config mapping
 
+%%--------------------------------------------------------------------
+%% Cluster
+%%--------------------------------------------------------------------
+
+%% Cluster ID
+{mapping, "cluster.id", "emqttd.cluster", [
+  {default, "emq"},
+  {datatype, string}
+]}.
+
+%% Cluster Multicast Addr
+{mapping, "cluster.multicast", "emqttd.cluster", [
+  {default, "239.192.0.1:44369"},
+  {datatype, string}
+]}.
+
+{translation, "emqttd.cluster", fun(Conf) ->
+    Multicast = cuttlefish:conf_get("cluster.multicast", Conf),
+    [Addr, Port] = string:tokens(Multicast, ":"),
+    {ok, Ip} = inet_parse:address(Addr),
+    [{id, cuttlefish:conf_get("cluster.id", Conf)},
+     {multicast, {Ip, list_to_integer(Port)}}]
+end}.
+
 %%--------------------------------------------------------------------
 %% Erlang Node
 %%--------------------------------------------------------------------
 
 %% @doc Erlang node name
 {mapping, "node.name", "vm_args.-name", [
-  {default, "emqttd@127.0.0.1"}
+  {default, "emq@127.0.0.1"}
 ]}.
 
 %% @doc Secret cookie for distributed erlang node
@@ -282,6 +306,12 @@ end}.
   {datatype, {enum, [true, false]}}
 ]}.
 
+%% @doc ACL nomatch
+{mapping, "mqtt.acl_nomatch", "emqttd.acl_nomatch", [
+  {default, allow},
+  {datatype, {enum, [allow, deny]}}
+]}.
+
 %% @doc Default ACL File
 {mapping, "mqtt.acl_file", "emqttd.acl_file", [
   {datatype, string},
@@ -328,6 +358,12 @@ end}.
 %% MQTT Client
 %%--------------------------------------------------------------------
 
+%% @doc Max Publish Rate of Message
+{mapping, "mqtt.client.max_publish_rate", "emqttd.client", [
+  {default, 0},
+  {datatype, integer}
+]}.
+
 %% @doc Client Idle Timeout.
 {mapping, "mqtt.client.idle_timeout", "emqttd.client", [
   {default, "30s"},
@@ -340,9 +376,9 @@ end}.
   {datatype, flag}
 ]}.
 
-%% @doc Client
 {translation, "emqttd.client", fun(Conf) ->
-  [{client_idle_timeout, cuttlefish:conf_get("mqtt.client.idle_timeout", Conf)},
+  [{max_publish_rate, cuttlefish:conf_get("mqtt.client.max_publish_rate", Conf)},
+   {client_idle_timeout, cuttlefish:conf_get("mqtt.client.idle_timeout", Conf)},
    {client_enable_stats, cuttlefish:conf_get("mqtt.client.enable_stats", Conf)}]
 end}.
 
@@ -350,6 +386,12 @@ end}.
 %% MQTT Session
 %%--------------------------------------------------------------------
 
+%% @doc Max Number of Subscriptions Allowed
+{mapping, "mqtt.session.max_subscriptions", "emqttd.session", [
+  {default, 0},
+  {datatype, integer}
+]}.
+
 %% @doc Upgrade QoS?
 {mapping, "mqtt.session.upgrade_qos", "emqttd.session", [
   {default, off},
@@ -393,72 +435,80 @@ end}.
   {datatype, {duration, ms}}
 ]}.
 
+%% @doc Ignore message from self publish
+{mapping, "mqtt.session.ignore_loop_deliver", "emqttd.session", [
+  {default, false},
+  {datatype, {enum, [true, false]}}
+]}.
+
 {translation, "emqttd.session", fun(Conf) ->
-  [{upgrade_qos,       cuttlefish:conf_get("mqtt.session.upgrade_qos", Conf)},
+  [{max_subscriptions, cuttlefish:conf_get("mqtt.session.max_subscriptions", Conf)},
+   {upgrade_qos,       cuttlefish:conf_get("mqtt.session.upgrade_qos", Conf)},
    {max_inflight,      cuttlefish:conf_get("mqtt.session.max_inflight", Conf)},
    {retry_interval,    cuttlefish:conf_get("mqtt.session.retry_interval", Conf)},
    {max_awaiting_rel,  cuttlefish:conf_get("mqtt.session.max_awaiting_rel", Conf)},
    {await_rel_timeout, cuttlefish:conf_get("mqtt.session.await_rel_timeout", Conf)},
    {enable_stats,      cuttlefish:conf_get("mqtt.session.enable_stats", Conf)},
-   {expiry_interval,   cuttlefish:conf_get("mqtt.session.expiry_interval", Conf)}]
+   {expiry_interval,   cuttlefish:conf_get("mqtt.session.expiry_interval", Conf)},
+   {ignore_loop_deliver, cuttlefish:conf_get("mqtt.session.ignore_loop_deliver", Conf)}]
 end}.
 
 %%--------------------------------------------------------------------
-%% MQTT Queue
+%% MQTT MQueue
 %%--------------------------------------------------------------------
 
 %% @doc Type: simple | priority
-{mapping, "mqtt.queue.type", "emqttd.queue", [
+{mapping, "mqtt.mqueue.type", "emqttd.mqueue", [
   {default, simple},
   {datatype, atom}
 ]}.
 
 %% @doc Topic Priority: 0~255, Default is 0
-{mapping, "mqtt.queue.priority", "emqttd.queue", [
+{mapping, "mqtt.mqueue.priority", "emqttd.mqueue", [
   {default, ""},
   {datatype, string}
 ]}.
 
-%% @doc Max queue length. Enqueued messages when persistent client disconnected, or inflight window is full.
-{mapping, "mqtt.queue.max_length", "emqttd.queue", [
-  {default, infinity},
-  {datatype, [integer, {atom, infinity}]}
+%% @doc Max queue length. Enqueued messages when persistent client disconnected, or inflight window is full. 0 means no limit.
+{mapping, "mqtt.mqueue.max_length", "emqttd.mqueue", [
+  {default, 0},
+  {datatype, integer}
 ]}.
 
 %% @doc Low-water mark of queued messages
-{mapping, "mqtt.queue.low_watermark", "emqttd.queue", [
+{mapping, "mqtt.mqueue.low_watermark", "emqttd.mqueue", [
   {default, "20%"},
   {datatype, string}
 ]}.
 
 %% @doc High-water mark of queued messages
-{mapping, "mqtt.queue.high_watermark", "emqttd.queue", [
+{mapping, "mqtt.mqueue.high_watermark", "emqttd.mqueue", [
   {default, "60%"},
   {datatype, string}
 ]}.
 
 %% @doc Queue Qos0 messages?
-{mapping, "mqtt.queue.qos0", "emqttd.queue", [
+{mapping, "mqtt.mqueue.store_qos0", "emqttd.mqueue", [
   {default, true},
   {datatype, {enum, [true, false]}}
 ]}.
 
-{translation, "emqttd.queue", fun(Conf) ->
+{translation, "emqttd.mqueue", fun(Conf) ->
   Parse = fun(S) ->
 			{match, [N]} = re:run(S, "^([0-9]+)%$", [{capture, all_but_first, list}]),
 			list_to_integer(N) / 100
 	      end,
-  Opts = [{type, cuttlefish:conf_get("mqtt.queue.type", Conf, simple)},
-          {max_length, cuttlefish:conf_get("mqtt.queue.max_length", Conf)},
-          {low_watermark, Parse(cuttlefish:conf_get("mqtt.queue.low_watermark", Conf))},
-          {high_watermark, Parse(cuttlefish:conf_get("mqtt.queue.high_watermark", Conf))},
-          {queue_qos0, cuttlefish:conf_get("mqtt.queue.qos0", Conf)}],
-  case cuttlefish:conf_get("mqtt.queue.priority", Conf) of
+  Opts = [{type, cuttlefish:conf_get("mqtt.mqueue.type", Conf, simple)},
+          {max_length, cuttlefish:conf_get("mqtt.mqueue.max_length", Conf)},
+          {low_watermark, Parse(cuttlefish:conf_get("mqtt.mqueue.low_watermark", Conf))},
+          {high_watermark, Parse(cuttlefish:conf_get("mqtt.mqueue.high_watermark", Conf))},
+          {store_qos0, cuttlefish:conf_get("mqtt.mqueue.store_qos0", Conf)}],
+  case cuttlefish:conf_get("mqtt.mqueue.priority", Conf) of
     undefined -> Opts;
-    V -> [{priority,
-			 [begin [T, P] = string:tokens(S, "="),
-					{T, list_to_integer(P)}
-		      end || S <- string:tokens(V, ",")]}|Opts]
+    V         -> [{priority,
+                   [begin [T, P] = string:tokens(S, "="),
+                          {T, list_to_integer(P)}
+                    end || S <- string:tokens(V, ",")]} | Opts]
   end
 end}.
 
@@ -531,165 +581,388 @@ end}.
 %% MQTT Listeners
 %%--------------------------------------------------------------------
 
-{mapping, "mqtt.listener.tcp", "emqttd.listeners", [
-  %% {default, 1883},
+%%--------------------------------------------------------------------
+%% TCP Listeners
+
+{mapping, "listener.tcp.$name", "emqttd.listeners", [
   {datatype, [integer, ip]}
 ]}.
 
-{mapping, "mqtt.listener.tcp.acceptors", "emqttd.listeners", [
+{mapping, "listener.tcp.$name.acceptors", "emqttd.listeners", [
   {default, 8},
   {datatype, integer}
 ]}.
 
-{mapping, "mqtt.listener.tcp.max_clients", "emqttd.listeners", [
+{mapping, "listener.tcp.$name.max_clients", "emqttd.listeners", [
   {default, 1024},
   {datatype, integer}
 ]}.
 
-{mapping, "mqtt.listener.tcp.rate_limit", "emqttd.listeners", [
+{mapping, "listener.tcp.$name.zone", "emqttd.listeners", [
+  {datatype, string}
+]}.
+
+{mapping, "listener.tcp.$name.mountpoint", "emqttd.listeners", [
+  {datatype, string}
+]}.
+
+{mapping, "listener.tcp.$name.rate_limit", "emqttd.listeners", [
   {default, undefined},
   {datatype, string}
 ]}.
 
-{mapping, "mqtt.listener.tcp.backlog", "emqttd.listeners", [
+{mapping, "listener.tcp.$name.access.$id", "emqttd.listeners", [
+  {datatype, string}
+]}.
+
+{mapping, "listener.tcp.$name.proxy_protocol", "emqttd.listeners", [
+  %%{default, off},
+  {datatype, flag}
+]}.
+
+{mapping, "listener.tcp.$name.proxy_protocol_timeout", "emqttd.listeners", [
+  %%{default, "5s"},
+  {datatype, {duration, ms}}
+]}.
+
+{mapping, "listener.tcp.$name.backlog", "emqttd.listeners", [
   {default, 1024},
   {datatype, integer}
 ]}.
 
-{mapping, "mqtt.listener.tcp.recbuf", "emqttd.listeners", [
-  {datatype, integer},
+{mapping, "listener.tcp.$name.recbuf", "emqttd.listeners", [
+  {datatype, bytesize},
   hidden
 ]}.
 
-{mapping, "mqtt.listener.tcp.sndbuf", "emqttd.listeners", [
-  {datatype, integer},
+{mapping, "listener.tcp.$name.sndbuf", "emqttd.listeners", [
+  {datatype, bytesize},
   hidden
 ]}.
 
-{mapping, "mqtt.listener.tcp.buffer", "emqttd.listeners", [
-  {datatype, integer},
+{mapping, "listener.tcp.$name.buffer", "emqttd.listeners", [
+  {datatype, bytesize},
   hidden
 ]}.
 
-{mapping, "mqtt.listener.tcp.tune_buffer", "emqttd.listeners", [
-  {default, off},
-  {datatype, flag}
+{mapping, "listener.tcp.$name.tune_buffer", "emqttd.listeners", [
+  {datatype, flag},
+  hidden
 ]}.
 
-{mapping, "mqtt.listener.tcp.nodelay", "emqttd.listeners", [
+{mapping, "listener.tcp.$name.nodelay", "emqttd.listeners", [
   {datatype, {enum, [true, false]}},
   hidden
 ]}.
 
-{mapping, "mqtt.listener.ssl", "emqttd.listeners", [
-  %% {default, 8883},
+%%--------------------------------------------------------------------
+%% SSL Listeners
+
+{mapping, "listener.ssl.$name", "emqttd.listeners", [
   {datatype, [integer, ip]}
 ]}.
 
-{mapping, "mqtt.listener.ssl.acceptors", "emqttd.listeners", [
+{mapping, "listener.ssl.$name.acceptors", "emqttd.listeners", [
   {default, 8},
   {datatype, integer}
 ]}.
 
-{mapping, "mqtt.listener.ssl.max_clients", "emqttd.listeners", [
-  {default, 512},
+{mapping, "listener.ssl.$name.max_clients", "emqttd.listeners", [
+  {default, 1024},
+  {datatype, integer}
+]}.
+
+{mapping, "listener.ssl.$name.zone", "emqttd.listeners", [
+  {datatype, string}
+]}.
+
+{mapping, "listener.ssl.$name.mountpoint", "emqttd.listeners", [
+  {datatype, string}
+]}.
+
+{mapping, "listener.ssl.$name.rate_limit", "emqttd.listeners", [
+  {default, undefined},
+  {datatype, string}
+]}.
+
+{mapping, "listener.ssl.$name.access.$id", "emqttd.listeners", [
+  {datatype, string}
+]}.
+
+{mapping, "listener.ssl.$name.proxy_protocol", "emqttd.listeners", [
+  %%{default, off},
+  {datatype, flag}
+]}.
+
+{mapping, "listener.ssl.$name.proxy_protocol_timeout", "emqttd.listeners", [
+  %%{default, "5s"},
+  {datatype, {duration, ms}}
+]}.
+
+{mapping, "listener.ssl.$name.backlog", "emqttd.listeners", [
+  {default, 1024},
   {datatype, integer}
 ]}.
 
-{mapping, "mqtt.listener.ssl.rate_limit", "emqttd.listeners", [
+{mapping, "listener.ssl.$name.recbuf", "emqttd.listeners", [
+  {datatype, bytesize},
+  hidden
+]}.
+
+{mapping, "listener.ssl.$name.sndbuf", "emqttd.listeners", [
+  {datatype, bytesize},
+  hidden
+]}.
+
+{mapping, "listener.ssl.$name.buffer", "emqttd.listeners", [
+  {datatype, bytesize},
+  hidden
+]}.
+
+{mapping, "listener.ssl.$name.tune_buffer", "emqttd.listeners", [
+  {datatype, flag},
+  hidden
+]}.
+
+{mapping, "listener.ssl.$name.nodelay", "emqttd.listeners", [
+  {datatype, {enum, [true, false]}},
+  hidden
+]}.
+
+{mapping, "listener.ssl.$name.tls_versions", "emqttd.listeners", [
   {datatype, string}
 ]}.
 
-{mapping, "mqtt.listener.ssl.tls_versions", "emqttd.listeners", [
+{mapping, "listener.ssl.$name.ciphers", "emqttd.listeners", [
   {datatype, string}
 ]}.
 
-{mapping, "mqtt.listener.ssl.handshake_timeout", "emqttd.listeners", [
+{mapping, "listener.ssl.$name.handshake_timeout", "emqttd.listeners", [
   {default, "15s"},
   {datatype, {duration, ms}}
 ]}.
 
-{mapping, "mqtt.listener.ssl.keyfile", "emqttd.listeners", [
+{mapping, "listener.ssl.$name.dhfile", "emqttd.listeners", [
   {datatype, string}
 ]}.
 
-{mapping, "mqtt.listener.ssl.certfile", "emqttd.listeners", [
+{mapping, "listener.ssl.$name.keyfile", "emqttd.listeners", [
   {datatype, string}
 ]}.
 
-{mapping, "mqtt.listener.ssl.cacertfile", "emqttd.listeners", [
+{mapping, "listener.ssl.$name.certfile", "emqttd.listeners", [
   {datatype, string}
 ]}.
 
-{mapping, "mqtt.listener.ssl.verify", "emqttd.listeners", [
+{mapping, "listener.ssl.$name.cacertfile", "emqttd.listeners", [
+  {datatype, string}
+]}.
+
+{mapping, "listener.ssl.$name.verify", "emqttd.listeners", [
   {datatype, atom}
 ]}.
 
-{mapping, "mqtt.listener.ssl.fail_if_no_peer_cert", "emqttd.listeners", [
+{mapping, "listener.ssl.$name.fail_if_no_peer_cert", "emqttd.listeners", [
   {datatype, {enum, [true, false]}}
 ]}.
 
-{mapping, "mqtt.listener.http", "emqttd.listeners", [
-  %% {default, 8083},
+{mapping, "listener.ssl.$name.secure_renegotiate", "emqttd.listeners", [
+  {datatype, flag}
+]}.
+
+{mapping, "listener.ssl.$name.reuse_sessions", "emqttd.listeners", [
+  {default, on},
+  {datatype, flag}
+]}.
+
+{mapping, "listener.ssl.$name.honor_cipher_order", "emqttd.listeners", [
+  {datatype, flag}
+]}.
+
+{mapping, "listener.ssl.$name.peer_cert_as_username", "emqttd.listeners", [
+  {datatype, {enum, [cn, dn]}}
+]}.
+
+%%--------------------------------------------------------------------
+%% MQTT/WebSocket Listeners
+
+{mapping, "listener.ws.$name", "emqttd.listeners", [
   {datatype, [integer, ip]}
 ]}.
 
-{mapping, "mqtt.listener.http.acceptors", "emqttd.listeners", [
+{mapping, "listener.ws.$name.acceptors", "emqttd.listeners", [
   {default, 8},
   {datatype, integer}
 ]}.
 
-{mapping, "mqtt.listener.http.max_clients", "emqttd.listeners", [
-  {default, 64},
+{mapping, "listener.ws.$name.max_clients", "emqttd.listeners", [
+  {default, 1024},
+  {datatype, integer}
+]}.
+
+{mapping, "listener.ws.$name.rate_limit", "emqttd.listeners", [
+  {datatype, string}
+]}.
+
+{mapping, "listener.ws.$name.zone", "emqttd.listeners", [
+  {datatype, string}
+]}.
+
+{mapping, "listener.ws.$name.access.$id", "emqttd.listeners", [
+  {datatype, string}
+]}.
+
+{mapping, "listener.ws.$name.backlog", "emqttd.listeners", [
+  {default, 1024},
   {datatype, integer}
 ]}.
 
-{mapping, "mqtt.listener.https", "emqttd.listeners", [
-  %%{default, 8084},
+{mapping, "listener.ws.$name.recbuf", "emqttd.listeners", [
+  {datatype, bytesize},
+  hidden
+]}.
+
+{mapping, "listener.ws.$name.sndbuf", "emqttd.listeners", [
+  {datatype, bytesize},
+  hidden
+]}.
+
+{mapping, "listener.ws.$name.buffer", "emqttd.listeners", [
+  {datatype, bytesize},
+  hidden
+]}.
+
+{mapping, "listener.ws.$name.tune_buffer", "emqttd.listeners", [
+  {datatype, flag},
+  hidden
+]}.
+
+{mapping, "listener.ws.$name.nodelay", "emqttd.listeners", [
+  {datatype, {enum, [true, false]}},
+  hidden
+]}.
+
+%%--------------------------------------------------------------------
+%% MQTT/WebSocket/SSL Listeners
+
+{mapping, "listener.wss.$name", "emqttd.listeners", [
   {datatype, [integer, ip]}
 ]}.
 
-{mapping, "mqtt.listener.https.acceptors", "emqttd.listeners", [
+{mapping, "listener.wss.$name.acceptors", "emqttd.listeners", [
   {default, 8},
   {datatype, integer}
 ]}.
 
-{mapping, "mqtt.listener.https.max_clients", "emqttd.listeners", [
-  {default, 64},
+{mapping, "listener.wss.$name.max_clients", "emqttd.listeners", [
+  {default, 1024},
   {datatype, integer}
 ]}.
 
-{mapping, "mqtt.listener.https.handshake_timeout", "emqttd.listeners", [
-  {default, 15},
+{mapping, "listener.wss.$name.zone", "emqttd.listeners", [
+  {datatype, string}
+]}.
+
+{mapping, "listener.wss.$name.mountpoint", "emqttd.listeners", [
+  {datatype, string}
+]}.
+
+{mapping, "listener.wss.$name.rate_limit", "emqttd.listeners", [
+  {datatype, string}
+]}.
+
+{mapping, "listener.wss.$name.access.$id", "emqttd.listeners", [
+  {datatype, string}
+]}.
+
+{mapping, "listener.wss.$name.backlog", "emqttd.listeners", [
+  {default, 1024},
   {datatype, integer}
 ]}.
 
-{mapping, "mqtt.listener.https.keyfile", "emqttd.listeners", [
+{mapping, "listener.wss.$name.recbuf", "emqttd.listeners", [
+  {datatype, bytesize},
+  hidden
+]}.
+
+{mapping, "listener.wss.$name.sndbuf", "emqttd.listeners", [
+  {datatype, bytesize},
+  hidden
+]}.
+
+{mapping, "listener.wss.$name.buffer", "emqttd.listeners", [
+  {datatype, bytesize},
+  hidden
+]}.
+
+{mapping, "listener.wss.$name.tune_buffer", "emqttd.listeners", [
+  {datatype, flag},
+  hidden
+]}.
+
+{mapping, "listener.wss.$name.nodelay", "emqttd.listeners", [
+  {datatype, {enum, [true, false]}},
+  hidden
+]}.
+
+{mapping, "listener.wss.$name.handshake_timeout", "emqttd.listeners", [
+  {default, "15s"},
+  {datatype, {duration, ms}}
+]}.
+
+{mapping, "listener.wss.$name.keyfile", "emqttd.listeners", [
   {datatype, string}
 ]}.
 
-{mapping, "mqtt.listener.https.certfile", "emqttd.listeners", [
+{mapping, "listener.wss.$name.certfile", "emqttd.listeners", [
   {datatype, string}
 ]}.
 
-{mapping, "mqtt.listener.https.cacertfile", "emqttd.listeners", [
+{mapping, "listener.wss.$name.cacertfile", "emqttd.listeners", [
   {datatype, string}
 ]}.
 
-{mapping, "mqtt.listener.https.verify", "emqttd.listeners", [
+{mapping, "listener.wss.$name.verify", "emqttd.listeners", [
   {datatype, atom}
 ]}.
 
-{mapping, "mqtt.listener.https.fail_if_no_peer_cert", "emqttd.listeners", [
+{mapping, "listener.wss.$name.fail_if_no_peer_cert", "emqttd.listeners", [
   {datatype, {enum, [true, false]}}
 ]}.
 
 {translation, "emqttd.listeners", fun(Conf) ->
+
     Filter  = fun(Opts) -> [{K, V} || {K, V} <- Opts, V =/= undefined] end,
+
+    Atom = fun(undefined) -> undefined; (S) -> list_to_atom(S) end,
+
+    Access = fun(S) ->
+                 [A, CIDR] = string:tokens(S, " "),
+                 {list_to_atom(A), case CIDR of "all" -> all; _ -> CIDR end}
+             end,
+
+    AccOpts = fun(Prefix) ->
+                  case cuttlefish_variable:filter_by_prefix(Prefix ++ ".access", Conf) of
+                      [] -> [];
+                      Rules -> [{access, [Access(Rule) || {_, Rule} <- Rules]}]
+                  end
+              end,
+
+    MountPoint = fun(undefined) -> undefined; (S) -> list_to_binary(S) end,
+
+    ConnOpts = fun(Prefix) ->
+                   Filter([{zone, Atom(cuttlefish:conf_get(Prefix ++ ".zone", Conf, undefined))},
+                           {rate_limit, cuttlefish:conf_get(Prefix ++ ".rate_limit", Conf, undefined)},
+                           {proxy_protocol, cuttlefish:conf_get(Prefix ++ ".proxy_protocol", Conf, undefined)},
+                           {proxy_protocol_timeout, cuttlefish:conf_get(Prefix ++ ".proxy_protocol_timeout", Conf, undefined)},
+                           {mountpoint, MountPoint(cuttlefish:conf_get(Prefix ++ ".mountpoint", Conf, undefined))},
+                           {peer_cert_as_username, cuttlefish:conf_get(Prefix ++ ".peer_cert_as_username", Conf, undefined)}])
+               end,
+
     LisOpts = fun(Prefix) ->
                   Filter([{acceptors,   cuttlefish:conf_get(Prefix ++ ".acceptors", Conf)},
                           {max_clients, cuttlefish:conf_get(Prefix ++ ".max_clients", Conf)},
-                          {tune_buffer, cuttlefish:conf_get(Prefix ++ ".tune_buffer", Conf, undefined)}])
+                          {tune_buffer, cuttlefish:conf_get(Prefix ++ ".tune_buffer", Conf, undefined)} | AccOpts(Prefix)])
               end,
     TcpOpts = fun(Prefix) ->
                    Filter([{backlog, cuttlefish:conf_get(Prefix ++ ".backlog", Conf, undefined)},
@@ -707,29 +980,48 @@ end}.
                                 L -> [list_to_atom(V) || V <- L]
                             end,
                   Filter([{versions, Versions},
-                          {handshake_timeout, cuttlefish:conf_get(Prefix ++ ".handshake_timeout", Conf), undefined},
+                          {ciphers, SplitFun(cuttlefish:conf_get(Prefix ++ ".ciphers", Conf, undefined))},
+                          {handshake_timeout, cuttlefish:conf_get(Prefix ++ ".handshake_timeout", Conf, undefined)},
+                          {dhfile, cuttlefish:conf_get(Prefix ++ ".dhfile", Conf, undefined)},
                           {keyfile,    cuttlefish:conf_get(Prefix ++ ".keyfile", Conf, undefined)},
                           {certfile,   cuttlefish:conf_get(Prefix ++ ".certfile", Conf, undefined)},
                           {cacertfile, cuttlefish:conf_get(Prefix ++ ".cacertfile", Conf, undefined)},
                           {verify,     cuttlefish:conf_get(Prefix ++ ".verify", Conf, undefined)},
-                          {fail_if_no_peer_cert, cuttlefish:conf_get(Prefix ++ ".fail_if_no_peer_cert", Conf, undefined)}])
+                          {fail_if_no_peer_cert, cuttlefish:conf_get(Prefix ++ ".fail_if_no_peer_cert", Conf, undefined)},
+                          {secure_renegotiate, cuttlefish:conf_get(Prefix ++ ".secure_renegotiate", Conf, undefined)},
+                          {reuse_sessions, cuttlefish:conf_get(Prefix ++ ".reuse_sessions", Conf, undefined)},
+                          {honor_cipher_order, cuttlefish:conf_get(Prefix ++ ".honor_cipher_order", Conf, undefined)}])
               end,
 
-    Listeners = fun(Name) when is_atom(Name) ->
-                    Key = "mqtt.listener." ++ atom_to_list(Name),
-                    case cuttlefish:conf_get(Key, Conf, undefined) of
-                        undefined ->
-                            [];
-                        Port ->
-                            ConnOpts = Filter([{rate_limit, cuttlefish:conf_get(Key ++ ".rate_limit", Conf, undefined)}]),
-                            Opts = [{connopts, ConnOpts}, {sockopts, TcpOpts(Key)} | LisOpts(Key)],
-                            [{Name, Port, case Name =:= ssl orelse Name =:= https of
-                                              true  -> [{sslopts, SslOpts(Key)} | Opts];
-                                              false -> Opts
-                                          end}]
-                   end
-                end,
-    lists:append([Listeners(tcp), Listeners(ssl), Listeners(http), Listeners(https)])
+    TcpListeners = fun(Type, Name) ->
+                       Prefix = string:join(["listener", Type, Name], "."),
+                       case cuttlefish:conf_get(Prefix, Conf, undefined) of
+                           undefined ->
+                               [];
+                           ListenOn ->
+                               [{Atom(Type), ListenOn, [{connopts, ConnOpts(Prefix)}, {sockopts, TcpOpts(Prefix)} | LisOpts(Prefix)]}]
+                       end
+                   end,
+
+    SslListeners = fun(Type, Name) ->
+                       Prefix = string:join(["listener", Type, Name], "."),
+                       case cuttlefish:conf_get(Prefix, Conf, undefined) of
+                           undefined ->
+                               [];
+                           ListenOn ->
+                               [{Atom(Type), ListenOn, [{connopts, ConnOpts(Prefix)},
+                                                        {sockopts, TcpOpts(Prefix)},
+                                                        {sslopts, SslOpts(Prefix)} | LisOpts(Prefix)]}]
+                       end
+                   end,
+
+    lists:flatten([TcpListeners(Type, Name) || {["listener", Type, Name], ListenOn}
+                                               <- cuttlefish_variable:filter_by_prefix("listener.tcp", Conf)
+                                               ++ cuttlefish_variable:filter_by_prefix("listener.ws", Conf)]
+                  ++
+                  [SslListeners(Type, Name) || {["listener", Type, Name], ListenOn}
+                                               <- cuttlefish_variable:filter_by_prefix("listener.ssl", Conf)
+                                               ++ cuttlefish_variable:filter_by_prefix("listener.wss", Conf)])
 end}.
 
 %%--------------------------------------------------------------------

Разница между файлами не показана из-за своего большого размера
+ 1 - 1
rebar.config


+ 12 - 12
src/emqttd.app.src

@@ -1,12 +1,12 @@
-{application, emqttd, [
-    {description, "Erlang MQTT Broker"},
-    {vsn, "2.1.2"},
-    {modules, []},
-    {registered, [emqttd_sup]},
-    {applications, [kernel,stdlib,gproc,lager,esockd,mochiweb,lager_syslog,pbkdf2]},
-    {env, []},
-    {mod, {emqttd_app, []}},
-    {maintainers, ["Feng Lee <feng@emqtt.io>"]},
-    {licenses, ["Apache-2.0"]},
-    {links, [{"Github", "https://github.com/emqtt/emqttd"}]}
-]}.
+{application,emqttd,
+             [{description,"Erlang MQTT Broker"},
+              {vsn,"2.2"},
+              {modules,[]},
+              {registered,[emqttd_sup]},
+              {applications,[kernel,stdlib,gproc,lager,esockd,mochiweb,
+                             lager_syslog,pbkdf2,bcrypt]},
+              {env,[]},
+              {mod,{emqttd_app,[]}},
+              {maintainers,["Feng Lee <feng@emqtt.io>"]},
+              {licenses,["Apache-2.0"]},
+              {links,[{"Github","https://github.com/emqtt/emqttd"}]}]}.

+ 4 - 10
src/emqttd_access_control.erl

@@ -71,16 +71,10 @@ auth(Client, Password, [{Mod, State, _Seq} | Mods]) ->
       PubSub :: pubsub(),
       Topic  :: binary()).
 check_acl(Client, PubSub, Topic) when ?PS(PubSub) ->
-    case lookup_mods(acl) of
-        []      -> case emqttd:env(allow_anonymous, false) of
-                       true  -> allow;
-                       false -> deny
-                   end;
-        AclMods -> check_acl(Client, PubSub, Topic, AclMods)
-    end.
-check_acl(#mqtt_client{client_id = ClientId}, PubSub, Topic, []) ->
-    lager:error("ACL: nomatch for ~s ~s ~s", [ClientId, PubSub, Topic]),
-    allow;
+    check_acl(Client, PubSub, Topic, lookup_mods(acl)).
+
+check_acl(_Client, _PubSub, _Topic, []) ->
+    emqttd:env(acl_nomatch, allow);
 check_acl(Client, PubSub, Topic, [{Mod, State, _Seq}|AclMods]) ->
     case Mod:check_acl({Client, PubSub, Topic}, State) of
         allow  -> allow;

+ 3 - 3
src/emqttd_acl_internal.erl

@@ -30,7 +30,7 @@
 
 -define(ACL_RULE_TAB, mqtt_acl_rule).
 
--record(state, {config, nomatch = allow}).
+-record(state, {config}).
 
 %%--------------------------------------------------------------------
 %% API
@@ -86,11 +86,11 @@ filter(_PubSub, {_AllowDeny, _Who, _, _Topics}) ->
       State  :: #state{}).
 check_acl(_Who, #state{config = undefined}) ->
     allow;
-check_acl({Client, PubSub, Topic}, #state{nomatch = Default}) ->
+check_acl({Client, PubSub, Topic}, #state{}) ->
     case match(Client, Topic, lookup(PubSub)) of
         {matched, allow} -> allow;
         {matched, deny}  -> deny;
-        nomatch          -> Default
+        nomatch          -> ignore
     end.
 
 lookup(PubSub) ->

+ 9 - 2
src/emqttd_auth_mod.erl

@@ -61,8 +61,15 @@ passwd_hash(sha,    Password)  ->
 passwd_hash(sha256, Password)  ->
     hexstring(crypto:hash(sha256, Password));
 passwd_hash(pbkdf2,{Salt, Password, Macfun, Iterations, Dklen}) ->
-    {ok,Hexstring} = pbkdf2:pbkdf2(Macfun, Password, Salt, Iterations, Dklen),
-    pbkdf2:to_hex(Hexstring).
+    case pbkdf2:pbkdf2(Macfun, Password, Salt, Iterations, Dklen) of
+        {ok,Hexstring} -> pbkdf2:to_hex(Hexstring);
+        {error, Error} -> lager:error("PasswdHash with pbkdf2 error:~p", [Error]), error
+    end;
+passwd_hash(bcrypt, {Salt, Password}) ->
+    case bcrypt:hashpw(Password, Salt) of
+        {ok, HashPassword} -> list_to_binary(HashPassword);
+        {error, Error}-> lager:error("PasswdHash with bcrypt error:~p", [Error]), error
+    end.
 
 hexstring(<<X:128/big-unsigned-integer>>) ->
     iolist_to_binary(io_lib:format("~32.16.0b", [X]));

+ 14 - 10
src/emqttd_client.erl

@@ -55,7 +55,7 @@
 %% Unused fields: connname, peerhost, peerport
 -record(client_state, {connection, peername, conn_state, await_recv,
                        rate_limit, packet_size, parser, proto_state,
-                       keepalive, enable_stats, force_gc_count}).
+                       keepalive, enable_stats, idle_timeout, force_gc_count}).
 
 -define(INFO_KEYS, [peername, conn_state, await_recv]).
 
@@ -112,8 +112,9 @@ do_init(Conn, Env, Peername) ->
     RateLimit = get_value(rate_limit, Conn:opts()),
     PacketSize = get_value(max_packet_size, Env, ?MAX_PACKET_SIZE),
     Parser = emqttd_parser:initial_state(PacketSize),
-    ProtoState = emqttd_protocol:init(Peername, SendFun, Env),
+    ProtoState = emqttd_protocol:init(Conn, Peername, SendFun, Env),
     EnableStats = get_value(client_enable_stats, Env, false),
+    IdleTimout = get_value(client_idle_timeout, Env, 30000),
     ForceGcCount = emqttd_gc:conn_max_gc_count(),
     State = run_socket(#client_state{connection     = Conn,
                                      peername       = Peername,
@@ -124,8 +125,8 @@ do_init(Conn, Env, Peername) ->
                                      parser         = Parser,
                                      proto_state    = ProtoState,
                                      enable_stats   = EnableStats,
+                                     idle_timeout   = IdleTimout,
                                      force_gc_count = ForceGcCount}),
-    IdleTimout = get_value(client_idle_timeout, Env, 30000),
     gen_server2:enter_loop(?MODULE, [], State, self(), IdleTimout,
                            {backoff, 2000, 2000, 20000}).
 
@@ -275,9 +276,11 @@ handle_info({keepalive, check}, State = #client_state{keepalive = KeepAlive}) ->
 handle_info(Info, State) ->
     ?UNEXPECTED_INFO(Info, State).
 
-terminate(Reason, #client_state{connection  = Conn,
-                                keepalive   = KeepAlive,
-                                proto_state = ProtoState}) ->
+terminate(Reason, State = #client_state{connection  = Conn,
+                                        keepalive   = KeepAlive,
+                                        proto_state = ProtoState}) ->
+
+    ?LOG(debug, "Terminated for ~p", [Reason], State),
     Conn:fast_close(),
     emqttd_keepalive:cancel(KeepAlive),
     case {ProtoState, Reason} of
@@ -300,12 +303,13 @@ code_change(_OldVsn, State, _Extra) ->
 received(<<>>, State) ->
     {noreply, gc(State), hibernate};
 
-received(Bytes, State = #client_state{parser      = Parser,
-                                      packet_size = PacketSize,
-                                      proto_state = ProtoState}) ->
+received(Bytes, State = #client_state{parser       = Parser,
+                                      packet_size  = PacketSize,
+                                      proto_state  = ProtoState,
+                                      idle_timeout = IdleTimeout}) ->
     case catch emqttd_parser:parse(Bytes, Parser) of
         {more, NewParser} ->
-            {noreply, run_socket(State#client_state{parser = NewParser}), hibernate};
+            {noreply, run_socket(State#client_state{parser = NewParser}), IdleTimeout};
         {ok, Packet, Rest} ->
             emqttd_metrics:received(Packet),
             case emqttd_protocol:received(Packet, ProtoState) of

+ 3 - 1
src/emqttd_cluster.erl

@@ -73,12 +73,14 @@ remove(Node) when Node =:= node() ->
     {error, {cannot_remove_self, Node}};
 
 remove(Node) ->
-    case rpc:call(Node, ?MODULE, prepare, []) of
+    case is_clustered(Node) andalso rpc:call(Node, ?MODULE, prepare, []) of
         ok ->
             case emqttd_mnesia:remove_from_cluster(Node) of
                 ok    -> rpc:call(Node, ?MODULE, reboot, []);
                 Error -> Error
             end;
+        false ->
+            {error, node_not_in_cluster};
         {badrpc, nodedown} ->
             emqttd_mnesia:remove_from_cluster(Node);
         {badrpc, Reason} ->

+ 33 - 27
src/emqttd_mqueue.erl

@@ -58,25 +58,27 @@
 
 -define(HIGH_WM, 0.6).
 
+-define(PQUEUE, priority_queue).
+
 -type(priority() :: {iolist(), pos_integer()}).
 
 -type(option() :: {type, simple | priority}
-                | {max_length, pos_integer() | infinity}
+                | {max_length, non_neg_integer()} %% Max queue length
                 | {priority, list(priority())}
                 | {low_watermark, float()}  %% Low watermark
                 | {high_watermark, float()} %% High watermark
-                | {queue_qos0, boolean()}). %% Queue Qos0?
+                | {store_qos0, boolean()}). %% Queue Qos0?
 
--type(stat() :: {max_len, infinity | pos_integer()}
+-type(stat() :: {max_len, non_neg_integer()}
               | {len, non_neg_integer()}
               | {dropped, non_neg_integer()}).
 
 -record(mqueue, {type :: simple | priority,
-                 name, q :: queue:queue() | priority_queue:q(),
+                 name, q :: queue:queue() | ?PQUEUE:q(),
                  %% priority table
                  pseq = 0, priorities = [],
                  %% len of simple queue
-                 len = 0, max_len = infinity,
+                 len = 0, max_len = 0,
                  low_wm = ?LOW_WM, high_wm = ?HIGH_WM,
                  qos0 = false, dropped = 0,
                  alarm_fun}).
@@ -89,19 +91,19 @@
 -spec(new(iolist(), list(option()), fun()) -> mqueue()).
 new(Name, Opts, AlarmFun) ->
     Type = get_value(type, Opts, simple),
-    MaxLen = get_value(max_length, Opts, infinity),
+    MaxLen = get_value(max_length, Opts, 0),
     init_q(#mqueue{type = Type, name = iolist_to_binary(Name),
                    len = 0, max_len = MaxLen,
                    low_wm = low_wm(MaxLen, Opts),
                    high_wm = high_wm(MaxLen, Opts),
-                   qos0 = get_value(queue_qos0, Opts, false),
+                   qos0 = get_value(store_qos0, Opts, false),
                    alarm_fun = AlarmFun}, Opts).
 
 init_q(MQ = #mqueue{type = simple}, _Opts) ->
     MQ#mqueue{q = queue:new()};
 init_q(MQ = #mqueue{type = priority}, Opts) ->
     Priorities = get_value(priority, Opts, []),
-    init_p(Priorities, MQ#mqueue{q = priority_queue:new()}).
+    init_p(Priorities, MQ#mqueue{q = ?PQUEUE:new()}).
 
 init_p([], MQ) ->
     MQ;
@@ -113,13 +115,13 @@ insert_p(Topic, P, MQ = #mqueue{priorities = Tab, pseq = Seq}) ->
     <<PInt:48>> = <<P:8, (erlang:phash2(Topic)):32, Seq:8>>,
     {PInt, MQ#mqueue{priorities = [{Topic, PInt} | Tab], pseq = Seq + 1}}.
 
-low_wm(infinity, _Opts) ->
-    infinity;
+low_wm(0, _Opts) ->
+    undefined;
 low_wm(MaxLen, Opts) ->
     round(MaxLen * get_value(low_watermark, Opts, ?LOW_WM)).
 
-high_wm(infinity, _Opts) ->
-    infinity;
+high_wm(0, _Opts) ->
+    undefined;
 high_wm(MaxLen, Opts) ->
     round(MaxLen * get_value(high_watermark, Opts, ?HIGH_WM)).
 
@@ -132,12 +134,12 @@ type(#mqueue{type = Type}) ->
     Type.
 
 is_empty(#mqueue{type = simple, len = Len}) -> Len =:= 0;
-is_empty(#mqueue{type = priority, q = Q})   -> priority_queue:is_empty(Q).
+is_empty(#mqueue{type = priority, q = Q})   -> ?PQUEUE:is_empty(Q).
 
 len(#mqueue{type = simple, len = Len}) -> Len;
-len(#mqueue{type = priority, q = Q})   -> priority_queue:len(Q).
+len(#mqueue{type = priority, q = Q})   -> ?PQUEUE:len(Q).
 
-max_len(#mqueue{max_len= MaxLen}) -> MaxLen.
+max_len(#mqueue{max_len = MaxLen}) -> MaxLen.
 
 %% @doc Dropped of the mqueue
 -spec(dropped(mqueue()) -> non_neg_integer()).
@@ -148,14 +150,14 @@ dropped(#mqueue{dropped = Dropped}) -> Dropped.
 stats(#mqueue{type = Type, q = Q, max_len = MaxLen, len = Len, dropped = Dropped}) ->
     [{len, case Type of
                 simple   -> Len;
-                priority -> priority_queue:len(Q)
+                priority -> ?PQUEUE:len(Q)
             end} | [{max_len, MaxLen}, {dropped, Dropped}]].
 
 %% @doc Enqueue a message.
 -spec(in(mqtt_message(), mqueue()) -> mqueue()).
 in(#mqtt_message{qos = ?QOS_0}, MQ = #mqueue{qos0 = false}) ->
     MQ;
-in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len, max_len = infinity}) ->
+in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len, max_len = 0}) ->
     MQ#mqueue{q = queue:in(Msg, Q), len = Len + 1};
 in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len, max_len = MaxLen, dropped = Dropped})
     when Len >= MaxLen ->
@@ -166,43 +168,45 @@ in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len}) ->
 
 in(Msg = #mqtt_message{topic = Topic}, MQ = #mqueue{type = priority, q = Q,
                                                     priorities = Priorities,
-                                                    max_len = infinity}) ->
+                                                    max_len = 0}) ->
     case lists:keysearch(Topic, 1, Priorities) of
         {value, {_, Pri}} ->
-            MQ#mqueue{q = priority_queue:in(Msg, Pri, Q)};
+            MQ#mqueue{q = ?PQUEUE:in(Msg, Pri, Q)};
         false ->
             {Pri, MQ1} = insert_p(Topic, 0, MQ),
-            MQ1#mqueue{q = priority_queue:in(Msg, Pri, Q)}
+            MQ1#mqueue{q = ?PQUEUE:in(Msg, Pri, Q)}
     end;
 in(Msg = #mqtt_message{topic = Topic}, MQ = #mqueue{type = priority, q = Q,
                                                     priorities = Priorities,
                                                     max_len = MaxLen}) ->
     case lists:keysearch(Topic, 1, Priorities) of
         {value, {_, Pri}} ->
-            case priority_queue:plen(Pri, Q) >= MaxLen of
+            case ?PQUEUE:plen(Pri, Q) >= MaxLen of
                 true ->
-                    {_, Q1} = priority_queue:out(Pri, Q),
-                    MQ#mqueue{q = priority_queue:in(Msg, Pri, Q1)};
+                    {_, Q1} = ?PQUEUE:out(Pri, Q),
+                    MQ#mqueue{q = ?PQUEUE:in(Msg, Pri, Q1)};
                 false ->
-                    MQ#mqueue{q = priority_queue:in(Msg, Pri, Q)}
+                    MQ#mqueue{q = ?PQUEUE:in(Msg, Pri, Q)}
             end;
         false ->
             {Pri, MQ1} = insert_p(Topic, 0, MQ),
-            MQ1#mqueue{q = priority_queue:in(Msg, Pri, Q)}
+            MQ1#mqueue{q = ?PQUEUE:in(Msg, Pri, Q)}
     end.
 
 out(MQ = #mqueue{type = simple, len = 0}) ->
     {empty, MQ};
-out(MQ = #mqueue{type = simple, q = Q, len = Len, max_len = infinity}) ->
+out(MQ = #mqueue{type = simple, q = Q, len = Len, max_len = 0}) ->
     {R, Q2} = queue:out(Q),
     {R, MQ#mqueue{q = Q2, len = Len - 1}};
 out(MQ = #mqueue{type = simple, q = Q, len = Len}) ->
     {R, Q2} = queue:out(Q),
     {R, maybe_clear_alarm(MQ#mqueue{q = Q2, len = Len - 1})};
 out(MQ = #mqueue{type = priority, q = Q}) ->
-    {R, Q2} = priority_queue:out(Q),
+    {R, Q2} = ?PQUEUE:out(Q),
     {R, MQ#mqueue{q = Q2}}.
 
+maybe_set_alarm(MQ = #mqueue{high_wm = undefined}) ->
+    MQ;
 maybe_set_alarm(MQ = #mqueue{name = Name, len = Len, high_wm = HighWM, alarm_fun = AlarmFun})
     when Len > HighWM ->
     Alarm = #mqtt_alarm{id = iolist_to_binary(["queue_high_watermark.", Name]),
@@ -213,6 +217,8 @@ maybe_set_alarm(MQ = #mqueue{name = Name, len = Len, high_wm = HighWM, alarm_fun
 maybe_set_alarm(MQ) ->
     MQ.
 
+maybe_clear_alarm(MQ = #mqueue{low_wm = undefined}) ->
+    MQ;
 maybe_clear_alarm(MQ = #mqueue{name = Name, len = Len, low_wm = LowWM, alarm_fun = AlarmFun})
     when Len < LowWM ->
     MQ#mqueue{alarm_fun = AlarmFun(clear, list_to_binary(["queue_high_watermark.", Name]))};

+ 73 - 31
src/emqttd_protocol.erl

@@ -27,7 +27,7 @@
 -import(proplists, [get_value/2, get_value/3]).
 
 %% API
--export([init/3, info/1, stats/1, clientid/1, client/1, session/1]).
+-export([init/3, init/4, info/1, stats/1, clientid/1, client/1, session/1]).
 
 -export([subscribe/2, unsubscribe/2, pubrel/2, shutdown/2]).
 
@@ -43,12 +43,12 @@
 -record(proto_state, {peername, sendfun, connected = false, client_id, client_pid,
                       clean_sess, proto_ver, proto_name, username, is_superuser,
                       will_msg, keepalive, max_clientid_len, session, stats_data,
-                      ws_initial_headers, connected_at}).
+                      mountpoint, ws_initial_headers, connected_at}).
 
 -type(proto_state() :: #proto_state{}).
 
 -define(INFO_KEYS, [client_id, username, clean_sess, proto_ver, proto_name,
-                    keepalive, will_msg, ws_initial_headers, connected_at]).
+                    keepalive, will_msg, ws_initial_headers, mountpoint, connected_at]).
 
 -define(STATS_KEYS, [recv_pkt, recv_msg, send_pkt, send_msg]).
 
@@ -63,12 +63,22 @@ init(Peername, SendFun, Opts) ->
     WsInitialHeaders = get_value(ws_initial_headers, Opts),
     #proto_state{peername           = Peername,
                  sendfun            = SendFun,
-                 client_pid         = self(),
                  max_clientid_len   = MaxLen,
                  is_superuser       = false,
+                 client_pid         = self(),
                  ws_initial_headers = WsInitialHeaders,
                  stats_data         = #proto_stats{enable_stats = EnableStats}}.
 
+init(Conn, Peername, SendFun, Opts) ->
+    enrich_opt(Conn:opts(), Conn, init(Peername, SendFun, Opts)).
+
+enrich_opt([], _Conn, State) ->
+    State;
+enrich_opt([{mountpoint, MountPoint} | ConnOpts], Conn, State) ->
+    enrich_opt(ConnOpts, Conn, State#proto_state{mountpoint = MountPoint});
+enrich_opt([_ | ConnOpts], Conn, State) ->
+    enrich_opt(ConnOpts, Conn, State).
+
 info(ProtoState) ->
     ?record_to_proplist(proto_state, ProtoState, ?INFO_KEYS).
 
@@ -87,6 +97,7 @@ client(#proto_state{client_id          = ClientId,
                     keepalive          = Keepalive,
                     will_msg           = WillMsg,
                     ws_initial_headers = WsInitialHeaders,
+                    mountpoint         = MountPoint,
                     connected_at       = Time}) ->
     WillTopic = if
                     WillMsg =:= undefined -> undefined;
@@ -101,6 +112,7 @@ client(#proto_state{client_id          = ClientId,
                  keepalive          = Keepalive,
                  will_topic         = WillTopic,
                  ws_initial_headers = WsInitialHeaders,
+                 mountpoint         = MountPoint,
                  connected_at       = Time}.
 
 session(#proto_state{session = Session}) ->
@@ -167,13 +179,13 @@ process(?CONNECT_PACKET(Var), State0) ->
                          keep_alive = KeepAlive,
                          client_id  = ClientId} = Var,
 
-    State1 = State0#proto_state{proto_ver  = ProtoVer,
-                                proto_name = ProtoName,
-                                username   = Username,
-                                client_id  = ClientId,
-                                clean_sess = CleanSess,
-                                keepalive  = KeepAlive,
-                                will_msg   = willmsg(Var),
+    State1 = State0#proto_state{proto_ver    = ProtoVer,
+                                proto_name   = ProtoName,
+                                username     = Username,
+                                client_id    = ClientId,
+                                clean_sess   = CleanSess,
+                                keepalive    = KeepAlive,
+                                will_msg     = willmsg(Var, State0),
                                 connected_at = os:timestamp()},
 
     {ReturnCode1, SessPresent, State3} =
@@ -240,10 +252,11 @@ process(?SUBSCRIBE_PACKET(PacketId, []), State) ->
 
 %% TODO: refactor later...
 process(?SUBSCRIBE_PACKET(PacketId, RawTopicTable),
-        State = #proto_state{session      = Session,
-                             client_id    = ClientId,
+        State = #proto_state{client_id    = ClientId,
                              username     = Username,
-                             is_superuser = IsSuperuser}) ->
+                             is_superuser = IsSuperuser,
+                             mountpoint   = MountPoint,
+                             session      = Session}) ->
     Client = client(State), TopicTable = parse_topic_table(RawTopicTable),
     AllowDenies = if
                     IsSuperuser -> [];
@@ -256,7 +269,8 @@ process(?SUBSCRIBE_PACKET(PacketId, RawTopicTable),
         false ->
             case emqttd_hooks:run('client.subscribe', [ClientId, Username], TopicTable) of
                 {ok, TopicTable1} ->
-                    emqttd_session:subscribe(Session, PacketId, TopicTable1), {ok, State};
+                    emqttd_session:subscribe(Session, PacketId, mount(MountPoint, TopicTable1)),
+                    {ok, State};
                 {stop, _} ->
                     {ok, State}
             end
@@ -267,12 +281,13 @@ process(?UNSUBSCRIBE_PACKET(PacketId, []), State) ->
     send(?UNSUBACK_PACKET(PacketId), State);
 
 process(?UNSUBSCRIBE_PACKET(PacketId, RawTopics),
-        State = #proto_state{client_id = ClientId,
-                             username  = Username,
-                             session   = Session}) ->
+        State = #proto_state{client_id  = ClientId,
+                             username   = Username,
+                             mountpoint = MountPoint,
+                             session    = Session}) ->
     case emqttd_hooks:run('client.unsubscribe', [ClientId, Username], parse_topics(RawTopics)) of
         {ok, TopicTable} ->
-            emqttd_session:unsubscribe(Session, TopicTable);
+            emqttd_session:unsubscribe(Session, mount(MountPoint, TopicTable));
         {stop, _} ->
             ok
     end,
@@ -286,11 +301,12 @@ process(?PACKET(?DISCONNECT), State) ->
     {stop, normal, State#proto_state{will_msg = undefined}}.
 
 publish(Packet = ?PUBLISH_PACKET(?QOS_0, _PacketId),
-        #proto_state{client_id = ClientId,
-                     username  = Username,
-                     session   = Session}) ->
+        #proto_state{client_id  = ClientId,
+                     username   = Username,
+                     mountpoint = MountPoint,
+                     session    = Session}) ->
     Msg = emqttd_message:from_packet(Username, ClientId, Packet),
-    emqttd_session:publish(Session, Msg);
+    emqttd_session:publish(Session, mount(MountPoint, Msg));
 
 publish(Packet = ?PUBLISH_PACKET(?QOS_1, _PacketId), State) ->
     with_puback(?PUBACK, Packet, State);
@@ -299,11 +315,12 @@ publish(Packet = ?PUBLISH_PACKET(?QOS_2, _PacketId), State) ->
     with_puback(?PUBREC, Packet, State).
 
 with_puback(Type, Packet = ?PUBLISH_PACKET(_Qos, PacketId),
-            State = #proto_state{client_id = ClientId,
-                                 username  = Username,
-                                 session   = Session}) ->
+            State = #proto_state{client_id  = ClientId,
+                                 username   = Username,
+                                 mountpoint = MountPoint,
+                                 session    = Session}) ->
     Msg = emqttd_message:from_packet(Username, ClientId, Packet),
-    case emqttd_session:publish(Session, Msg) of
+    case emqttd_session:publish(Session, mount(MountPoint, Msg)) of
         ok ->
             send(?PUBACK_PACKET(Type, PacketId), State);
         {error, Error} ->
@@ -311,10 +328,12 @@ with_puback(Type, Packet = ?PUBLISH_PACKET(_Qos, PacketId),
     end.
 
 -spec(send(mqtt_message() | mqtt_packet(), proto_state()) -> {ok, proto_state()}).
-send(Msg, State = #proto_state{client_id = ClientId, username = Username})
+send(Msg, State = #proto_state{client_id  = ClientId,
+                               username   = Username,
+                               mountpoint = MountPoint})
         when is_record(Msg, mqtt_message) ->
     emqttd_hooks:run('message.delivered', [ClientId, Username], Msg),
-    send(emqttd_message:to_packet(Msg), State);
+    send(emqttd_message:to_packet(unmount(MountPoint, Msg)), State);
 
 send(Packet = ?PACKET(Type),
      State = #proto_state{sendfun = SendFun, stats_data = Stats}) ->
@@ -371,8 +390,11 @@ shutdown(Error, State = #proto_state{will_msg = WillMsg}) ->
     %% emqttd_cm:unreg(ClientId).
     ok.
 
-willmsg(Packet) when is_record(Packet, mqtt_packet_connect) ->
-    emqttd_message:from_packet(Packet).
+willmsg(Packet, #proto_state{mountpoint = MountPoint}) when is_record(Packet, mqtt_packet_connect) ->
+    case emqttd_message:from_packet(Packet) of
+        undefined -> undefined;
+        Msg -> mount(MountPoint, Msg)
+    end.
 
 %% Generate a client if if nulll
 maybe_set_clientid(State = #proto_state{client_id = NullId})
@@ -513,3 +535,23 @@ check_acl(subscribe, Topic, Client) ->
 
 sp(true)  -> 1;
 sp(false) -> 0.
+
+%%--------------------------------------------------------------------
+%% Mount Point
+%%--------------------------------------------------------------------
+
+mount(undefined, Any) ->
+    Any;
+mount(MountPoint, Msg = #mqtt_message{topic = Topic}) ->
+    Msg#mqtt_message{topic = <<MountPoint/binary, Topic/binary>>};
+mount(MountPoint, TopicTable) when is_list(TopicTable) ->
+    [{<<MountPoint/binary, Topic/binary>>, Opts} || {Topic, Opts} <- TopicTable].
+
+unmount(undefined, Any) ->
+    Any;
+unmount(MountPoint, Msg = #mqtt_message{topic = Topic}) ->
+    case catch split_binary(Topic, byte_size(MountPoint)) of
+        {MountPoint, Topic0} -> Msg#mqtt_message{topic = Topic0};
+        _ -> Msg
+    end.
+

+ 24 - 13
src/emqttd_session.erl

@@ -77,6 +77,8 @@
 -export([prioritise_call/4, prioritise_cast/3, prioritise_info/3,
          handle_pre_hibernate/1]).
 
+-define(MQueue, emqttd_mqueue).
+
 -record(state,
         {
          %% Clean Session Flag
@@ -124,7 +126,7 @@
          %% QoS 1 and QoS 2 messages pending transmission to the Client.
          %%
          %% Optionally, QoS 0 messages pending transmission to the Client.
-         mqueue :: emqttd_mqueue:mqueue(),
+         mqueue :: ?MQueue:mqueue(),
 
          %% Client -> Broker: Inflight QoS2 messages received from client and waiting for pubrel.
          awaiting_rel :: map(),
@@ -150,7 +152,9 @@
          %% Force GC Count
          force_gc_count :: undefined | integer(),
 
-         created_at :: erlang:timestamp()
+         created_at :: erlang:timestamp(),
+
+         ignore_loop_deliver = false :: boolean()
         }).
 
 -define(TIMEOUT, 60000).
@@ -257,12 +261,9 @@ stats(#state{max_subscriptions = MaxSubscriptions,
                   {subscriptions,     maps:size(Subscriptions)},
                   {max_inflight,      MaxInflight},
                   {inflight_len,      Inflight:size()},
-                  {max_mqueue,        case emqttd_mqueue:max_len(MQueue) of
-                                        infinity -> 0;
-                                        Len -> Len
-                                      end},
-                  {mqueue_len,        emqttd_mqueue:len(MQueue)},
-                  {mqueue_dropped,    emqttd_mqueue:dropped(MQueue)},
+                  {max_mqueue,        ?MQueue:max_len(MQueue)},
+                  {mqueue_len,        ?MQueue:len(MQueue)},
+                  {mqueue_dropped,    ?MQueue:dropped(MQueue)},
                   {max_awaiting_rel,  MaxAwaitingRel},
                   {awaiting_rel_len,  maps:size(AwaitingRel)},
                   {deliver_msg,       get(deliver_msg)},
@@ -282,11 +283,12 @@ init([CleanSess, {ClientId, Username}, ClientPid]) ->
     true = link(ClientPid),
     init_stats([deliver_msg, enqueue_msg]),
     {ok, Env} = emqttd:env(session),
-    {ok, QEnv} = emqttd:env(queue),
+    {ok, QEnv} = emqttd:env(mqueue),
     MaxInflight = get_value(max_inflight, Env, 0),
     EnableStats = get_value(enable_stats, Env, false),
+    IgnoreLoopDeliver = get_value(ignore_loop_deliver, Env, false),
     ForceGcCount = emqttd_gc:conn_max_gc_count(),
-    MQueue = emqttd_mqueue:new(ClientId, QEnv, emqttd_alarm:alarm_fun()),
+    MQueue = ?MQueue:new(ClientId, QEnv, emqttd_alarm:alarm_fun()),
     State = #state{clean_sess        = CleanSess,
                    binding           = binding(ClientPid),
                    client_id         = ClientId,
@@ -305,7 +307,8 @@ init([CleanSess, {ClientId, Username}, ClientPid]) ->
                    expiry_interval   = get_value(expiry_interval, Env),
                    enable_stats      = EnableStats,
                    force_gc_count    = ForceGcCount,
-                   created_at        = os:timestamp()},
+                   created_at        = os:timestamp(),
+                   ignore_loop_deliver = IgnoreLoopDeliver},
     emqttd_sm:register_session(ClientId, CleanSess, info(State)),
     emqttd_hooks:run('session.created', [ClientId, Username]),
     {ok, emit_stats(State), hibernate, {backoff, 1000, 1000, 10000}}.
@@ -526,6 +529,14 @@ handle_cast({destroy, ClientId},
 handle_cast(Msg, State) ->
     ?UNEXPECTED_MSG(Msg, State).
 
+%% Dispatch message from self publish
+handle_info({dispatch, Topic, Msg = #mqtt_message{from = {ClientId, _}}}, 
+             State = #state{client_id = ClientId, 
+                            ignore_loop_deliver = IgnoreLoopDeliver}) when is_record(Msg, mqtt_message) ->
+    case IgnoreLoopDeliver of
+        true  -> {noreply, State, hibernate};
+        false -> {noreply, gc(dispatch(tune_qos(Topic, Msg, State), State)), hibernate}
+    end;
 %% Dispatch Message
 handle_info({dispatch, Topic, Msg}, State) when is_record(Msg, mqtt_message) ->
     {noreply, gc(dispatch(tune_qos(Topic, Msg, State), State)), hibernate};
@@ -698,7 +709,7 @@ dispatch(Msg = #mqtt_message{qos = QoS},
 
 enqueue_msg(Msg, State = #state{mqueue = Q}) ->
     inc_stats(enqueue_msg),
-    State#state{mqueue = emqttd_mqueue:in(Msg, Q)}.
+    State#state{mqueue = ?MQueue:in(Msg, Q)}.
 
 %%--------------------------------------------------------------------
 %% Deliver
@@ -765,7 +776,7 @@ dequeue(State = #state{inflight = Inflight}) ->
     end.
 
 dequeue2(State = #state{mqueue = Q}) ->
-    case emqttd_mqueue:out(Q) of
+    case ?MQueue:out(Q) of
         {empty, _Q} ->
             State;
         {{value, Msg}, Q1} ->

+ 1 - 1
src/emqttd_ws_client.erl

@@ -93,7 +93,7 @@ init([Env, WsPid, Req, ReplyChannel]) ->
     Headers = mochiweb_headers:to_list(
                 mochiweb_request:get(headers, Req)),
     Conn = Req:get(connection),
-    ProtoState = emqttd_protocol:init(Peername, send_fun(ReplyChannel),
+    ProtoState = emqttd_protocol:init(Conn, Peername, send_fun(ReplyChannel),
                                       [{ws_initial_headers, Headers} | Env]),
     IdleTimeout = get_value(client_idle_timeout, Env, 30000),
     EnableStats = get_value(client_enable_stats, Env, false),