Преглед изворни кода

Merge pull request #733 from emqtt/emq20

2.0-rc.2
Feng Lee пре 9 година
родитељ
комит
1b957e998f

+ 1 - 0
.gitignore

@@ -27,3 +27,4 @@ ct.coverdata
 .idea/
 emqttd.iml
 _rel/
+data/

+ 21 - 9
Makefile

@@ -2,14 +2,22 @@ PROJECT = emqttd
 PROJECT_DESCRIPTION = Erlang MQTT Broker
 PROJECT_VERSION = 2.0
 
-DEPS = gproc lager gen_logger gen_conf esockd mochiweb
-
-dep_gproc      = git https://github.com/uwiger/gproc
-dep_lager      = git https://github.com/basho/lager
-dep_gen_conf   = git https://github.com/emqtt/gen_conf
-dep_gen_logger = git https://github.com/emqtt/gen_logger
-dep_esockd     = git https://github.com/emqtt/esockd emq20
-dep_mochiweb   = git https://github.com/emqtt/mochiweb
+DEPS = gproc lager gen_logger esockd mochiweb getopt pbkdf2 \
+	   clique time_compat rand_compat
+
+dep_gproc       = git https://github.com/uwiger/gproc
+dep_getopt      = git https://github.com/jcomellas/getopt v0.8.2
+dep_lager       = git https://github.com/basho/lager master
+dep_gen_logger  = git https://github.com/emqtt/gen_logger
+dep_esockd      = git https://github.com/emqtt/esockd emq20
+dep_mochiweb    = git https://github.com/emqtt/mochiweb
+dep_clique      = git https://github.com/basho/clique
+dep_pbkdf2      = git https://github.com/basho/erlang-pbkdf2 2.0.0
+dep_time_compat = git https://github.com/lasp-lang/time_compat
+dep_rand_compat = git https://github.com/lasp-lang/rand_compat
+
+TEST_DEPS = cuttlefish
+dep_cuttlefish = git https://github.com/emqtt/cuttlefish
 
 ERLC_OPTS += +'{parse_transform, lager_transform}'
 
@@ -20,7 +28,9 @@ EUNIT_OPTS = verbose
 # EUNIT_ERL_OPTS =
 
 CT_SUITES = emqttd emqttd_access emqttd_lib emqttd_mod emqttd_net \
-			emqttd_mqueue emqttd_protocol emqttd_topic emqttd_trie
+			emqttd_mqueue emqttd_protocol emqttd_topic emqttd_trie \
+			emqttd_vm
+
 CT_OPTS = -cover test/ct.cover.spec -erl_args -name emqttd_ct@127.0.0.1
 
 COVER = true
@@ -29,3 +39,5 @@ include erlang.mk
 
 app:: rebar.config
 
+app.config::
+	cuttlefish -l info -e etc/ -c etc/emq.conf -i priv/emq.schema -d data/

+ 29 - 0
etc/acl.conf

@@ -0,0 +1,29 @@
+%%--------------------------------------------------------------------
+%%
+%% [ACL](https://github.com/emqtt/emqttd/wiki/ACL)
+%%
+%% -type who() :: all | binary() |
+%%                {ipaddr, esockd_access:cidr()} |
+%%                {client, binary()} |
+%%                {user, binary()}.
+%%
+%% -type access() :: subscribe | publish | pubsub.
+%%
+%% -type topic() :: binary().
+%%
+%% -type rule() :: {allow, all} |
+%%                 {allow, who(), access(), list(topic())} |
+%%                 {deny, all} |
+%%                 {deny, who(), access(), list(topic())}.
+%%
+%%--------------------------------------------------------------------
+
+{allow, {user, "dashboard"}, subscribe, ["$SYS/#"]}.
+
+{allow, {ipaddr, "127.0.0.1"}, pubsub, ["$SYS/#", "#"]}.
+
+{deny, all, subscribe, ["$SYS/#", {eq, "#"}]}.
+
+{allow, all}.
+
+

+ 1 - 0
etc/certs/README

@@ -0,0 +1 @@
+Place your SSL/TLS Certificates here.

+ 15 - 0
etc/certs/cacert.pem

@@ -0,0 +1,15 @@
+-----BEGIN CERTIFICATE-----
+MIICZTCCAc4CCQCPzzI1ezeZPTANBgkqhkiG9w0BAQUFADB3MQswCQYDVQQGEwJD
+TjERMA8GA1UECBMIWmhlSmlhbmcxETAPBgNVBAcTCEhhbmdaaG91MREwDwYDVQQK
+EwhlbXF0dC5pbzERMA8GA1UEAxMIZW1xdHQuaW8xHDAaBgkqhkiG9w0BCQEWDWhv
+bmdAZW1xdHQuaW8wHhcNMTYxMDEzMDkwNzQ5WhcNMTYxMTEyMDkwNzQ5WjB3MQsw
+CQYDVQQGEwJDTjERMA8GA1UECBMIWmhlSmlhbmcxETAPBgNVBAcTCEhhbmdaaG91
+MREwDwYDVQQKEwhlbXF0dC5pbzERMA8GA1UEAxMIZW1xdHQuaW8xHDAaBgkqhkiG
+9w0BCQEWDWhvbmdAZW1xdHQuaW8wgZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJAoGB
+AJ/6ACaLPXP6wpGOqc9+jFRFN6ufODqGB5SamCNmOKXpSm/U+KT87NPg4i4wn31s
+167nb65lk3IbdvzPzTSCAP6DG5s0+qDgpEMHeKKEC4zaAwoIxCgVUjab51RbVFBs
+AhzowxdRl6jQrGVgvXiLzz1+3b+1Xydu5J5Z2IeLm8NPAgMBAAEwDQYJKoZIhvcN
+AQEFBQADgYEAkt/VWi4tUUEdOnDwnCZ4IheV9Sp+6T3XsRxje7PKDsvZQlmpvMP6
+StfM+wkxty2dxVOU5Sx8CwXk5roKvULQY5rAyn9log6vEAI4Oyr4vnRN24JF7/Tr
+xeP1cOv2LJlEuQm1JWe+VtNqfJ+f81CnfaJMAo17W5T/5UxI5n8ziKc=
+-----END CERTIFICATE-----

+ 15 - 0
etc/certs/cert.pem

@@ -0,0 +1,15 @@
+-----BEGIN CERTIFICATE-----
+MIICZjCCAc+gAwIBAgIJAO89PfgaeHB2MA0GCSqGSIb3DQEBBQUAMHcxCzAJBgNV
+BAYTAkNOMREwDwYDVQQIEwhaaGVKaWFuZzERMA8GA1UEBxMISGFuZ1pob3UxETAP
+BgNVBAoTCGVtcXR0LmlvMREwDwYDVQQDEwhlbXF0dC5pbzEcMBoGCSqGSIb3DQEJ
+ARYNaG9uZ0BlbXF0dC5pbzAeFw0xNjEwMTMwOTEzMTFaFw0xNjExMTIwOTEzMTFa
+MEYxCzAJBgNVBAYTAkNOMREwDwYDVQQIDAhaaGVKaWFuZzERMA8GA1UEBwwISGFu
+Z1pob3UxETAPBgNVBAsMCGVtcXR0LmlvMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCB
+iQKBgQDaOI1oasKjo0JGk5bMIxGInlxbvTuJZ8436u8HY4q8jZ+a4G12+UdTVHRF
+d94/ClHWn8WvOvzbxvmSkhninlzdWm1rBLWis3Z2kStmhL77kITEfIImus9pjm5l
+OgBxY4+Q7LSEsKYYH+ClYVaLlzO8PILEkBk6xxxq0X7AnCfDfQIDAQABoyswKTAJ
+BgNVHRMEAjAAMAsGA1UdDwQEAwIF4DAPBgNVHREECDAGhwR/AAABMA0GCSqGSIb3
+DQEBBQUAA4GBAGS1yw1w9H7F4uOaK02mUCHZiV+EBB3gkBBqtAx7TXsmoGgT6ySA
+7DwrbX6IH82bhZT4TjouhaPlUPE9pin88d/2kNbRrbZoZDMYGq02mVVRxfLzJqM2
+GVlsxebsFFPbYhOaf9TuRR3v13ebga0FrXNke+IGLsYZSM2PZ+F4EvIA
+-----END CERTIFICATE-----

+ 14 - 0
etc/certs/client-cert.pem

@@ -0,0 +1,14 @@
+-----BEGIN CERTIFICATE-----
+MIICITCCAYoCCQDvPT34GnhwdzANBgkqhkiG9w0BAQUFADB3MQswCQYDVQQGEwJD
+TjERMA8GA1UECBMIWmhlSmlhbmcxETAPBgNVBAcTCEhhbmdaaG91MREwDwYDVQQK
+EwhlbXF0dC5pbzERMA8GA1UEAxMIZW1xdHQuaW8xHDAaBgkqhkiG9w0BCQEWDWhv
+bmdAZW1xdHQuaW8wHhcNMTYxMDEzMDkxNTMxWhcNMTYxMTEyMDkxNTMxWjB3MQsw
+CQYDVQQGEwJDTjERMA8GA1UECBMIWmhlSmlhbmcxETAPBgNVBAcTCEhhbmdaaG91
+MREwDwYDVQQKEwhlbXF0dC5pbzERMA8GA1UEAxMIZW1xdHQuaW8xHDAaBgkqhkiG
+9w0BCQEWDWhvbmdAZW1xdHQuaW8wXDANBgkqhkiG9w0BAQEFAANLADBIAkEAx1yF
+I3YnvDPtHpGzJ+9ZGnnKkvMdaoyawT9rPvLsteeDkfknJcGCV5mKmjvH1xeeMIN1
+Kql9nVPoe7BtzJ0XwQIDAQABMA0GCSqGSIb3DQEBBQUAA4GBAALulKuZuE6RhwIT
+JBrUN7j4dbJe7Ttz+Q3qSQq6hNJoDf8hNrAHUDQzov9yU/KMMi9xE6+hu+ieuTo6
+hKLBDAD4hDzb6+EU5HAcASDkAXWnQq/Keo73+VrmUwMQs93tTC/jGXpsj/gLMEWB
+xcxXpgBPDGIR9L8Y2YMhEBLjm7Zv
+-----END CERTIFICATE-----

+ 9 - 0
etc/certs/client-key.pem

@@ -0,0 +1,9 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIIBOwIBAAJBAMdchSN2J7wz7R6RsyfvWRp5ypLzHWqMmsE/az7y7LXng5H5JyXB
+gleZipo7x9cXnjCDdSqpfZ1T6HuwbcydF8ECAwEAAQJAOGtblmyS1DVRzsvnCs82
+xUJgbPP2iDfgd/4tqLPw/41T9d4RimhfNMUF9n+9IZPCGPXGGc4OYQttNq+w/BG/
+vQIhAPlZCz+fI1OEcqNB5BjYRrZU+6KtkSaKXRL/2e2yAmbTAiEAzK4WOv+Zs8x7
+aG9pO2SOy38qCBOq1xfohFJnPaWaEpsCIQCE1zaR75NfdEmqxnjh759ElmP1WCjj
+coWBkMMmylZTNwIgAeFPfvc+GDK2p3zugIcp8KCYaD6WASfNEPoYzK4qviUCIQDt
+sTM3JZeInrLoDJwhgrmMFDjlf7XZ+LF7uYDRuE+7jw==
+-----END RSA PRIVATE KEY-----

+ 15 - 0
etc/certs/key.pem

@@ -0,0 +1,15 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIICXQIBAAKBgQDaOI1oasKjo0JGk5bMIxGInlxbvTuJZ8436u8HY4q8jZ+a4G12
++UdTVHRFd94/ClHWn8WvOvzbxvmSkhninlzdWm1rBLWis3Z2kStmhL77kITEfIIm
+us9pjm5lOgBxY4+Q7LSEsKYYH+ClYVaLlzO8PILEkBk6xxxq0X7AnCfDfQIDAQAB
+AoGBAKyew61vllxfjtPJeCYvL3WE38ZqIKiHBufQ3hhYM60H0tNu6OiONE/EpN02
+/wWbIjXG2VfOL6ui8FVzYSqU3xt8kD+zs3+Q4Qz+UEe9bwLEysswWyQA/YJtOS60
+FPxedT/gs0SAQP4MAc/FeFXOXVSzX4nVMoVeLpjvakh3hQNBAkEA8yObp8U6WCdE
+p5TBQl5CyYJq4lyZ+d8DLp/v6gK7P4Nm8tTeSNmOtoG2nA6/VtSI75jm9yl5owzF
+CZn3654GSQJBAOXDhcQhEjR+NMUR62a1YRaBK36ZtiJXFdIV4g16BUBp6q4UDAiV
+3GBVuCYsykXT6V+3cR0vvUQUbpfAUl1BQ5UCQQCDytVgx2OszPxF6jgnhXimSe8t
+7Av6iYvsBf3B1uEwuEVhc0laK7NT8lPNm6DTrDjdxv/LEcxBOXbEkZT1Pp8hAkB3
+tvdkqKKWrUeLgvm3azwqAKWL8kUfAWcCLpq40OIZnNZFW3alpofLvf4UDfRai76m
+O6t5PJ2N8mNpODDyHAY9AkAqbnmnmbswN+ayB80357N8298GLBqlG+A6YHps7SnR
+K+4poUZgdhs0e5zh7jAR7cyQuQnKC7LMR0ZRH1WTs97V
+-----END RSA PRIVATE KEY-----

+ 282 - 0
etc/emq.conf

@@ -0,0 +1,282 @@
+##--------------------------------------------------------------------
+## Node Args
+##--------------------------------------------------------------------
+
+## Node name
+node.name = emqttd@127.0.0.1
+
+## Cookie for distributed node
+node.cookie = emq_dist_cookie
+
+## SMP support: enable, auto, disable
+node.smp = auto
+
+## Enable kernel poll
+node.kernel_poll = on
+
+## async thread pool
+node.async_threads = 32
+
+## Erlang Process Limit
+node.process_limit = 256000
+
+## Sets the maximum number of simultaneously existing ports for this system
+node.max_ports = 65536
+
+## Set the distribution buffer busy limit (dist_buf_busy_limit)
+node.dist_buffer_size = 32MB
+
+## Max ETS Tables.
+## Note that mnesia and SSL will create temporary ets tables.
+node.max_ets_tables = 256000
+
+## Tweak GC to run more often
+node.fullsweep_after = 1000
+
+## Crash dump
+node.crash_dump = log/crash.dump
+
+## Distributed node ticktime
+node.dist_net_ticktime = 60
+
+## Distributed node port range
+## node.dist_listen_min = 6000
+## node.dist_listen_max = 6999
+
+##--------------------------------------------------------------------
+## Log
+##--------------------------------------------------------------------
+
+## Console log. Enum: off, file, console, both
+log.console = console
+
+## Console log level. Enum: debug, info, notice, warning, error, critical, alert, emergency
+log.console.level = error
+
+## Console log file
+## log.console.file = log/console.log
+
+## Error log file
+log.error.file = log/error.log
+
+## Enable the crash log. Enum: on, off
+log.crash = on
+
+log.crash.file = log/crash.log
+
+##--------------------------------------------------------------------
+## MQTT Protocol
+##--------------------------------------------------------------------
+
+## Max ClientId Length Allowed.
+mqtt.max_clientid_len = 1024
+
+## Max Packet Size Allowed, 64K by default.
+mqtt.max_packet_size = 64KB
+
+## Client Idle Timeout (Second)
+mqtt.client_idle_timeout = 30
+
+## Allow Anonymous authentication
+mqtt.allow_anonymous = true
+
+## Default ACL File
+mqtt.acl_file = etc/acl.conf
+
+##--------------------------------------------------------------------
+## MQTT Session
+##--------------------------------------------------------------------
+
+## Max number of QoS 1 and 2 messages that can be “inflight” at one time.
+## 0 means no limit
+mqtt.session.max_inflight = 100
+
+## Retry interval for redelivering QoS1/2 messages.
+mqtt.session.retry_interval = 60
+
+## Awaiting PUBREL Timeout
+mqtt.session.await_rel_timeout = 20
+
+## Max Packets that Awaiting PUBREL, 0 means no limit
+mqtt.session.max_awaiting_rel = 0
+
+## Statistics Collection Interval(seconds)
+mqtt.session.collect_interval = 0
+
+## Expired after 1 day:
+## w - week
+## d - day
+## h - hour
+## m - minute
+## s - second
+mqtt.session.expired_after = 1d
+
+##--------------------------------------------------------------------
+## MQTT Queue
+##--------------------------------------------------------------------
+
+## Type: simple | priority
+mqtt.queue.type = simple
+
+## Topic Priority: 0~255, Default is 0
+## mqtt.queue.priority = topic/1=10,topic/2=8
+
+## Max queue length. Enqueued messages when persistent client disconnected,
+## or inflight window is full.
+mqtt.queue.max_length = infinity
+
+## Low-water mark of queued messages
+mqtt.queue.low_watermark = 20%
+
+## High-water mark of queued messages
+mqtt.queue.high_watermark = 60%
+
+## Queue Qos0 messages?
+mqtt.queue.qos0 = true
+
+##--------------------------------------------------------------------
+## MQTT Broker and PubSub
+##--------------------------------------------------------------------
+
+## System Interval of publishing broker $SYS Messages
+mqtt.broker.sys_interval = 60
+
+## PubSub Pool Size. Default should be scheduler numbers.
+mqtt.pubsub.pool_size = 8
+
+mqtt.pubsub.by_clientid = true
+
+## Subscribe Asynchronously
+mqtt.pubsub.async = true
+
+##--------------------------------------------------------------------
+## MQTT Bridge
+##--------------------------------------------------------------------
+
+## Bridge Queue Size
+mqtt.bridge.max_queue_len = 10000
+
+## Ping Interval of bridge node. Unit: Second
+mqtt.bridge.ping_down_interval = 1
+
+##-------------------------------------------------------------------
+## MQTT Plugins
+##-------------------------------------------------------------------
+
+## Dir of plugins' config
+mqtt.plugins.etc_dir = etc/plugins/
+
+## File to store loaded plugin names.
+mqtt.plugins.loaded_file = data/loaded_plugins
+
+##-------------------------------------------------------------------
+## MQTT Modules
+##-------------------------------------------------------------------
+
+## Enable retainer module
+mqtt.module.retainer = on
+
+## disc: disc_copies, ram: ram_copies
+mqtt.module.retainer.storage_type = ram
+
+## Max number of retained messages
+mqtt.module.retainer.max_message_num = 100000
+
+## Max Payload Size of retained message
+mqtt.module.retainer.max_payload_size = 64KB
+
+## Expired after seconds, never expired if 0
+mqtt.module.retainer.expired_after = 0
+
+## Enable presence module
+## Publish presence messages when client connected or disconnected.
+mqtt.module.presence = on
+
+mqtt.module.presence.qos = 0
+
+## Enable subscription module
+## Subscribe topics automatically when client connected
+mqtt.module.subscription = on
+
+mqtt.module.subscription.topics = $client/%c=1,$user/%u=1
+
+##--------------------------------------------------------------------
+## MQTT Listeners
+##--------------------------------------------------------------------
+
+## TCP Listener: 1883, 127.0.0.1:1883, ::1:1883
+mqtt.listener.tcp = 1883
+
+## Size of acceptor pool
+mqtt.listener.tcp.acceptors = 8
+
+## Maximum number of concurrent clients
+mqtt.listener.tcp.max_clients = 1024
+
+## Rate Limit. Format is 'burst,rate', Unit is KB/Sec
+## mqtt.listener.tcp.rate_limit = 100,10
+
+## TCP Socket Options
+mqtt.listener.tcp.backlog = 1024
+## mqtt.listener.tcp.recbuf = 4096
+## mqtt.listener.tcp.sndbuf = 4096
+## mqtt.listener.tcp.buffer = 4096
+## mqtt.listener.tcp.nodelay = true
+
+## SSL Listener: 8883, 127.0.0.1:8883, ::1:8883
+mqtt.listener.ssl = 8883
+
+## Size of acceptor pool
+mqtt.listener.ssl.acceptors = 4
+
+## Maximum number of concurrent clients
+mqtt.listener.ssl.max_clients = 512
+
+## Rate Limit. Format is 'burst,rate', Unit is KB/Sec
+## mqtt.listener.ssl.rate_limit = 100,10
+
+## Configuring SSL Options
+## See http://erlang.org/doc/man/ssl.html
+mqtt.listener.ssl.handshake_timeout = 15
+mqtt.listener.ssl.keyfile = etc/certs/key.pem
+mqtt.listener.ssl.certfile = etc/certs/cert.pem
+mqtt.listener.ssl.cacertfile = etc/certs/cacert.pem
+## mqtt.listener.ssl.verify = verify_peer
+## mqtt.listener.ssl.failed_if_no_peer_cert = true
+
+## HTTP and WebSocket Listener
+mqtt.listener.http = 8083
+mqtt.listener.http.acceptors = 4
+mqtt.listener.http.max_clients = 64
+
+## HTTP(SSL) Listener
+## mqtt.listener.https = 8084
+## mqtt.listener.https.acceptors = 4
+## mqtt.listener.https.max_clients = 64
+## mqtt.listener.https.handshake_timeout = 10
+## mqtt.listener.https.certfile = etc/certs/cert.pem
+## mqtt.listener.https.keyfile = etc/certs/key.pem
+## mqtt.listener.https.cacertfile = etc/certs/cacert.pem
+## mqtt.listener.https.verify = verify_peer
+## mqtt.listener.https.failed_if_no_peer_cert = true
+
+##-------------------------------------------------------------------
+## System Monitor
+##-------------------------------------------------------------------
+
+## Long GC, don't monitor in production mode for:
+## https://github.com/erlang/otp/blob/feb45017da36be78d4c5784d758ede619fa7bfd3/erts/emulator/beam/erl_gc.c#L421
+sysmon.long_gc = false
+
+## Long Schedule(ms)
+sysmon.long_schedule = 240
+
+## 8M words. 32MB on 32-bit VM, 64MB on 64-bit VM.
+sysmon.large_heap = 8MB
+
+## Busy Port
+sysmon.busy_port = false
+
+## Busy Dist Port
+sysmon.busy_dist_port = true
+

+ 758 - 0
priv/emq.schema

@@ -0,0 +1,758 @@
+%%-*- mode: erlang -*-
+%% EMQ config mapping
+
+%%--------------------------------------------------------------------
+%% Erlang Node
+%%--------------------------------------------------------------------
+
+%% @doc Erlang node name
+{mapping, "node.name", "vm_args.-name", [
+  {default, "emqttd@127.0.0.1"}
+]}.
+
+%% @doc Secret cookie for distributed erlang node
+{mapping, "node.cookie", "vm_args.-setcookie", [
+  {default, "emqsecretcookie"}
+]}.
+
+%% @doc SMP Support
+{mapping, "node.smp", "vm_args.-smp", [
+  {default, auto},
+  {datatype, {enum, [enable, auto, disable]}},
+  hidden
+]}.
+
+%% @doc Enable Kernel Poll
+{mapping, "node.kernel_poll", "vm_args.+K", [
+  {default, on},
+  {datatype, flag},
+  hidden
+]}.
+
+%% @doc More information at: http://erlang.org/doc/man/erl.html
+{mapping, "node.async_threads", "vm_args.+A", [
+  {default, 64},
+  {datatype, integer},
+  {validators, ["range:0-1024"]}
+]}.
+
+%% @doc Erlang Process Limit
+{mapping, "node.process_limit", "vm_args.+P", [
+  {datatype, integer},
+  {default, 256000},
+  hidden
+]}.
+
+%% Note: OTP R15 and earlier uses -env ERL_MAX_PORTS, R16+ uses +Q
+%% @doc The number of concurrent ports/sockets
+%% Valid range is 1024-134217727
+{mapping, "node.max_ports",
+  cuttlefish:otp("R16", "vm_args.+Q", "vm_args.-env ERL_MAX_PORTS"), [
+  {default, 262144},
+  {datatype, integer},
+  {validators, ["range4ports"]}
+]}.
+
+{validator, "range4ports", "must be 1024 to 134217727",
+ fun(X) -> X >= 1024 andalso X =< 134217727 end}.
+
+%% @doc http://www.erlang.org/doc/man/erl.html#%2bzdbbl
+{mapping, "node.dist_buffer_size", "vm_args.+zdbbl", [
+  {datatype, bytesize},
+  {commented, "32MB"},
+  hidden,
+  {validators, ["zdbbl_range"]}
+]}.
+
+{translation, "vm_args.+zdbbl",
+ fun(Conf) ->
+  ZDBBL = cuttlefish:conf_get("node.dist_buffer_size", Conf, undefined),
+  case ZDBBL of
+    undefined -> undefined;
+    X when is_integer(X) -> cuttlefish_util:ceiling(X / 1024); %% Bytes to Kilobytes;
+    _ -> undefined
+  end
+ end
+}.
+
+{validator, "zdbbl_range", "must be between 1KB and 2097151KB",
+ fun(ZDBBL) ->
+  %% 2097151KB = 2147482624
+  ZDBBL >= 1024 andalso ZDBBL =< 2147482624
+ end
+}.
+
+%% @doc http://www.erlang.org/doc/man/erlang.html#system_flag-2
+{mapping, "node.fullsweep_after", "vm_args.-env ERL_FULLSWEEP_AFTER", [
+  {default, 1000},
+  {datatype, integer},
+  hidden,
+  {validators, ["positive_integer"]}
+]}.
+
+{validator, "positive_integer", "must be a positive integer",
+  fun(X) -> X >= 0 end}.
+
+%% Note: OTP R15 and earlier uses -env ERL_MAX_ETS_TABLES,
+%% R16+ uses +e
+%% @doc The ETS table limit
+{mapping, "node.max_ets_tables",
+  cuttlefish:otp("R16", "vm_args.+e", "vm_args.-env ERL_MAX_ETS_TABLES"), [
+  {default, 256000},
+  {datatype, integer},
+  hidden
+]}.
+
+%% @doc Set the location of crash dumps
+{mapping, "node.crash_dump", "vm_args.-env ERL_CRASH_DUMP", [
+  {default, "{{crash_dump}}"},
+  {datatype, file},
+  hidden
+]}.
+
+%% @doc http://www.erlang.org/doc/man/kernel_app.html#net_ticktime
+{mapping, "node.dist_net_ticktime", "vm_args.-kernel net_ticktime", [
+  {commented, 60},
+  {datatype, integer},
+  hidden
+]}.
+
+%% @doc http://www.erlang.org/doc/man/kernel_app.html
+{mapping, "node.dist_listen_min", "kernel.inet_dist_listen_min", [
+  {commented, 6000},
+  {datatype, integer},
+  hidden
+]}.
+
+%% @see node.dist_listen_min
+{mapping, "node.dist_listen_max", "kernel.inet_dist_listen_max", [
+  {commented, 6999},
+  {datatype, integer},
+  hidden
+]}.
+
+%%--------------------------------------------------------------------
+%% Log
+%%--------------------------------------------------------------------
+
+{mapping, "log.console", "lager.handlers", [
+  {default, file },
+  {datatype, {enum, [off, file, console, both]}}
+]}.
+
+{mapping, "log.console.level", "lager.handlers", [
+  {default, info},
+  {datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency, none]}}
+]}.
+
+{mapping, "log.console.file", "lager.handlers", [
+  {default, "log/console.log"},
+  {datatype, file}
+]}.
+
+{mapping, "log.error.file", "lager.handlers", [
+  {default, "log/error.log"},
+  {datatype, file}
+]}.
+
+{mapping, "log.error.redirect", "lager.error_logger_redirect", [
+  {default, on},
+  {datatype, flag},
+  hidden
+]}.
+
+{mapping, "log.error.messages_per_second", "lager.error_logger_hwm", [
+  {default, 1000},
+  {datatype, integer},
+  hidden
+]}.
+
+{translation,
+ "lager.handlers",
+ fun(Conf) ->
+    ErrorHandler = case cuttlefish:conf_get("log.error.file", Conf) of
+      undefined -> [];
+      ErrorFilename -> [{lager_file_backend, [{file, ErrorFilename},
+                                              {level, error},
+                                              {size, 10485760},
+                                              {date, "$D0"},
+                                              {count, 5}]}]
+    end,
+
+    ConsoleLogLevel = cuttlefish:conf_get("log.console.level", Conf),
+    ConsoleLogFile = cuttlefish:conf_get("log.console.file", Conf),
+
+    ConsoleHandler = {lager_console_backend, ConsoleLogLevel},
+    ConsoleFileHandler = {lager_file_backend, [{file, ConsoleLogFile},
+                                               {level, ConsoleLogLevel},
+                                               {size, 10485760},
+                                               {date, "$D0"},
+                                               {count, 5}]},
+
+    ConsoleHandlers = case cuttlefish:conf_get("log.console", Conf) of
+      off -> [];
+      file -> [ConsoleFileHandler];
+      console -> [ConsoleHandler];
+      both -> [ConsoleHandler, ConsoleFileHandler];
+      _ -> []
+    end,
+    ConsoleHandlers ++ ErrorHandler
+  end
+}.
+
+{mapping, "log.crash", "lager.crash_log", [
+  {default, on},
+  {datatype, flag}
+]}.
+
+{mapping, "log.crash.file", "lager.crash_log", [
+  {default, "log/crash.log"},
+  {datatype, file}
+]}.
+
+{translation,
+ "lager.crash_log",
+ fun(Conf) ->
+     case cuttlefish:conf_get("log.crash", Conf) of
+         false -> undefined;
+         _ ->
+             cuttlefish:conf_get("log.crash.file", Conf, "./log/crash.log")
+     end
+ end}.
+
+{mapping, "sasl", "sasl.sasl_error_logger", [
+  {default, off},
+  {datatype, flag},
+  hidden
+]}.
+
+%%--------------------------------------------------------------------
+%% MQTT Protocol
+%%--------------------------------------------------------------------
+
+%% @doc Set the Max ClientId Length Allowed.
+{mapping, "mqtt.max_clientid_len", "emqttd.protocol", [
+  {default, 1024},
+  {datatype, integer}
+]}.
+
+%% @doc Max Packet Size Allowed, 64K by default.
+{mapping, "mqtt.max_packet_size", "emqttd.protocol", [
+  {default, "64KB"},
+  {datatype, bytesize}
+]}.
+
+%% @doc Client Idle Timeout.
+{mapping, "mqtt.client_idle_timeout", "emqttd.protocol", [
+  {default, 30},
+  {datatype, integer}
+]}.
+
+{translation, "emqttd.protocol", fun(Conf) ->
+  [{max_clientid_len,    cuttlefish:conf_get("mqtt.max_clientid_len", Conf)},
+   {max_packet_size,     cuttlefish:conf_get("mqtt.max_packet_size", Conf)},
+   {client_idle_timeout, cuttlefish:conf_get("mqtt.client_idle_timeout", Conf)}]
+end}.
+
+%% @doc Allow Anonymous
+{mapping, "mqtt.allow_anonymous", "emqttd.allow_anonymous", [
+  {default, false},
+  {datatype, {enum, [true, false]}},
+  hidden
+]}.
+
+%% @doc Default ACL File
+{mapping, "mqtt.acl_file", "emqttd.acl_file", [
+  {datatype, string},
+  hidden
+]}.
+
+%%--------------------------------------------------------------------
+%% MQTT Session
+%%--------------------------------------------------------------------
+
+%% @doc Max number of QoS 1 and 2 messages that can be “inflight” at one time.
+%% 0 means no limit
+{mapping, "mqtt.session.max_inflight", "emqttd.session", [
+  {default, 100},
+  {datatype, integer}
+]}.
+
+
+%% @doc Retry interval for redelivering QoS1/2 messages.
+{mapping, "mqtt.session.retry_interval", "emqttd.session", [
+  {default, 60},
+  {datatype, integer}
+]}.
+
+%% @doc Awaiting PUBREL Timeout
+{mapping, "mqtt.session.await_rel_timeout", "emqttd.session", [
+  {default, 30},
+  {datatype, integer}
+]}.
+
+%% @doc Max Packets that Awaiting PUBREL, 0 means no limit
+{mapping, "mqtt.session.max_awaiting_rel", "emqttd.session", [
+  {default, 0},
+  {datatype, integer}
+]}.
+
+%% @doc Statistics Collection Interval(seconds)
+{mapping, "mqtt.session.collect_interval", "emqttd.session", [
+  {default, 0},
+  {datatype, integer}
+]}.
+
+%% @doc Session expired after...
+{mapping, "mqtt.session.expired_after", "emqttd.session", [
+  {default, "2d"},
+  {datatype, {duration, s}}
+]}.
+
+{translation, "emqttd.session", fun(Conf) ->
+  [{max_inflight, cuttlefish:conf_get("mqtt.session.max_inflight", Conf)},
+   {retry_interval, cuttlefish:conf_get("mqtt.session.retry_interval", Conf)},
+   {await_rel_timeout, cuttlefish:conf_get("mqtt.session.await_rel_timeout", Conf)},
+   {max_awaiting_rel, cuttlefish:conf_get("mqtt.session.max_awaiting_rel", Conf)},
+   {collect_interval, cuttlefish:conf_get("mqtt.session.collect_interval", Conf)},
+   {expired_after, cuttlefish:conf_get("mqtt.session.expired_after", Conf)}]
+end}.
+
+%%--------------------------------------------------------------------
+%% MQTT Queue
+%%--------------------------------------------------------------------
+
+%% @doc Type: simple | priority
+{mapping, "mqtt.queue.type", "emqttd.queue", [
+  {default, simple},
+  {datatype, atom}
+]}.
+
+%% @doc Topic Priority: 0~255, Default is 0
+{mapping, "mqtt.queue.priority", "emqttd.queue", [
+  {default, ""},
+  {datatype, string},
+  hidden
+]}.
+
+%% @doc Max queue length. Enqueued messages when persistent client disconnected, or inflight window is full.
+{mapping, "mqtt.queue.max_length", "emqttd.queue", [
+  {default, infinity},
+  {datatype, [atom, integer]}
+]}.
+
+%% @doc Low-water mark of queued messages
+{mapping, "mqtt.queue.low_watermark", "emqttd.queue", [
+  {default, "20%"},
+  {datatype, string},
+  hidden
+]}.
+
+%% @doc High-water mark of queued messages
+{mapping, "mqtt.queue.high_watermark", "emqttd.queue", [
+  {default, "60%"},
+  {datatype, string},
+  hidden
+]}.
+
+%% @doc Queue Qos0 messages?
+{mapping, "mqtt.queue.qos0", "emqttd.queue", [
+  {default, true},
+  {datatype, {enum, [true, false]}}
+]}.
+
+{translation, "emqttd.queue", fun(Conf) ->
+  Parse = fun(S) ->
+			{match, [N]} = re:run(S, "^([0-9]+)%$", [{capture, all_but_first, list}]),
+			list_to_integer(N) / 100
+	      end,
+  Opts = [{type, cuttlefish:conf_get("mqtt.queue.type", Conf, simple)},
+          {max_length, cuttlefish:conf_get("mqtt.queue.max_length", Conf)},
+          {low_watermark, Parse(cuttlefish:conf_get("mqtt.queue.low_watermark", Conf))},
+          {high_watermark, Parse(cuttlefish:conf_get("mqtt.queue.high_watermark", Conf))},
+          {queue_qos0, cuttlefish:conf_get("mqtt.queue.qos0", Conf)}],
+  case cuttlefish:conf_get("mqtt.queue.priority", Conf) of
+    undefined -> Opts;
+    V -> [{priority,
+			 [begin [T, P] = string:tokens(S, "="),
+					{T, list_to_integer(P)}
+		      end || S <- string:tokens(V, ",")]}|Opts]
+  end
+end}.
+
+%%--------------------------------------------------------------------
+%% MQTT Broker
+%%--------------------------------------------------------------------
+
+{mapping, "mqtt.broker.sys_interval", "emqttd.broker_sys_interval", [
+  {default, 60},
+  {datatype, integer}
+]}.
+
+%%--------------------------------------------------------------------
+%% MQTT PubSub
+%%--------------------------------------------------------------------
+
+{mapping, "mqtt.pubsub.pool_size", "emqttd.pubsub", [
+  {default, 8},
+  {datatype, integer}
+]}.
+
+{mapping, "mqtt.pubsub.by_clientid", "emqttd.pubsub", [
+  {default, true},
+  {datatype, {enum, [true, false]}}
+]}.
+
+{mapping, "mqtt.pubsub.async", "emqttd.pubsub", [
+  {default, true},
+  {datatype, {enum, [true, false]}},
+  hidden
+]}.
+
+{translation, "emqttd.pubsub", fun(Conf) ->
+  [{pool_size, cuttlefish:conf_get("mqtt.pubsub.pool_size", Conf)},
+   {by_clientid, cuttlefish:conf_get("mqtt.pubsub.by_clientid", Conf)},
+   {async, cuttlefish:conf_get("mqtt.pubsub.async", Conf)}]
+end}.
+
+%%--------------------------------------------------------------------
+%% MQTT Bridge
+%%--------------------------------------------------------------------
+
+{mapping, "mqtt.bridge.max_queue_len", "emqttd.bridge", [
+  {default, 10000},
+  {datatype, integer}
+]}.
+
+{mapping, "mqtt.bridge.ping_down_interval", "emqttd.bridge", [
+  {default, 1},
+  {datatype, integer}
+]}.
+
+{translation, "emqttd.bridge", fun(Conf) ->
+  [{max_queue_len, cuttlefish:conf_get("mqtt.bridge.max_queue_len", Conf)},
+   {ping_down_interval, cuttlefish:conf_get("mqtt.bridge.ping_down_interval", Conf)}]
+end}.
+
+%%-------------------------------------------------------------------
+%% MQTT Plugins
+%%-------------------------------------------------------------------
+
+{mapping, "mqtt.plugins.etc_dir", "emqttd.plugins_etc_dir", [
+  {datatype, string}
+]}.
+
+{mapping, "mqtt.plugins.loaded_file", "emqttd.plugins_loaded_file", [
+  {datatype, string}
+]}.
+
+%%--------------------------------------------------------------------
+%% MQTT Listeners
+%%--------------------------------------------------------------------
+
+{mapping, "mqtt.listener.tcp", "emqttd.listeners", [
+  {default, 1883},
+  {datatype, [integer, ip]}
+]}.
+
+{mapping, "mqtt.listener.tcp.acceptors", "emqttd.listeners", [
+  {default, 8},
+  {datatype, integer}
+]}.
+
+{mapping, "mqtt.listener.tcp.max_clients", "emqttd.listeners", [
+  {default, 1024},
+  {datatype, integer}
+]}.
+
+{mapping, "mqtt.listener.tcp.rate_limit", "emqttd.listeners", [
+  {default, undefined},
+  {datatype, string},
+  hidden
+]}.
+
+{mapping, "mqtt.listener.tcp.backlog", "emqttd.listeners", [
+  {default, 1024},
+  {datatype, integer}
+]}.
+
+{mapping, "mqtt.listener.tcp.recbuf", "emqttd.listeners", [
+  {datatype, integer},
+  hidden
+]}.
+
+{mapping, "mqtt.listener.tcp.sndbuf", "emqttd.listeners", [
+  {datatype, integer},
+  hidden
+]}.
+
+{mapping, "mqtt.listener.tcp.buffer", "emqttd.listeners", [
+  {datatype, integer},
+  hidden
+]}.
+
+{mapping, "mqtt.listener.tcp.nodelay", "emqttd.listeners", [
+  {datatype, {enum, [true, false]}},
+  hidden
+]}.
+
+{mapping, "mqtt.listener.ssl", "emqttd.listeners", [
+  {default, 8883},
+  {datatype, [integer, ip]}
+]}.
+
+{mapping, "mqtt.listener.ssl.acceptors", "emqttd.listeners", [
+  {default, 8},
+  {datatype, integer}
+]}.
+
+{mapping, "mqtt.listener.ssl.max_clients", "emqttd.listeners", [
+  {default, 512},
+  {datatype, integer}
+]}.
+
+{mapping, "mqtt.listener.ssl.rate_limit", "emqttd.listeners", [
+  {datatype, string}
+]}.
+
+{mapping, "mqtt.listener.ssl.handshake_timeout", "emqttd.listeners", [
+  {default, 15},
+  {datatype, integer}
+]}.
+
+{mapping, "mqtt.listener.ssl.keyfile", "emqttd.listeners", [
+  {datatype, string}
+]}.
+
+{mapping, "mqtt.listener.ssl.certfile", "emqttd.listeners", [
+  {datatype, string}
+]}.
+
+{mapping, "mqtt.listener.ssl.cacertfile", "emqttd.listeners", [
+  {datatype, string}
+]}.
+
+{mapping, "mqtt.listener.ssl.verify", "emqttd.listeners", [
+  {datatype, string}
+]}.
+
+{mapping, "mqtt.listener.ssl.failed_if_no_peer_cert", "emqttd.listeners", [
+  {datatype, {enum, [true, false]}}
+]}.
+
+{mapping, "mqtt.listener.http", "emqttd.listeners", [
+  {default, 8883},
+  {datatype, [integer, ip]}
+]}.
+
+{mapping, "mqtt.listener.http.acceptors", "emqttd.listeners", [
+  {default, 8},
+  {datatype, integer}
+]}.
+
+{mapping, "mqtt.listener.http.max_clients", "emqttd.listeners", [
+  {default, 64},
+  {datatype, integer}
+]}.
+
+{mapping, "mqtt.listener.https", "emqttd.listeners", [
+  {default, undefined},
+  {datatype, [integer, ip]},
+  hidden
+]}.
+
+{mapping, "mqtt.listener.https.acceptors", "emqttd.listeners", [
+  {default, 8},
+  {datatype, integer}
+]}.
+
+{mapping, "mqtt.listener.https.max_clients", "emqttd.listeners", [
+  {default, 64},
+  {datatype, integer}
+]}.
+
+{mapping, "mqtt.listener.https.handshake_timeout", "emqttd.listeners", [
+  {default, 15},
+  {datatype, integer}
+]}.
+
+{mapping, "mqtt.listener.https.keyfile", "emqttd.listeners", [
+  {datatype, string}
+]}.
+
+{mapping, "mqtt.listener.https.certfile", "emqttd.listeners", [
+  {datatype, string}
+]}.
+
+{mapping, "mqtt.listener.https.cacertfile", "emqttd.listeners", [
+  {datatype, string}
+]}.
+
+{mapping, "mqtt.listener.https.verify", "emqttd.listeners", [
+  {datatype, string}
+]}.
+
+{mapping, "mqtt.listener.https.failed_if_no_peer_cert", "emqttd.listeners", [
+  {datatype, {enum, [true, false]}}
+]}.
+
+{translation, "emqttd.listeners", fun(Conf) ->
+    Filter  = fun(Opts) -> [{K, V} || {K, V} <- Opts, V =/= undefined] end,
+    LisOpts = fun(Prefix) ->
+                  Filter([{acceptors,   cuttlefish:conf_get(Prefix ++ ".acceptors", Conf)},
+                          {max_clients, cuttlefish:conf_get(Prefix ++ ".max_clients", Conf)},
+                          {rate_limt,   cuttlefish:conf_get(Prefix ++ ".rate_limit", Conf, undefined)}])
+ end,
+    TcpOpts = fun(Prefix) ->
+                   Filter([{backlog, cuttlefish:conf_get(Prefix ++ ".backlog", Conf, undefined)},
+                           {recbuf,  cuttlefish:conf_get(Prefix ++ ".recbuf", Conf, undefined)},
+                           {sndbuf,  cuttlefish:conf_get(Prefix ++ ".sndbuf", Conf, undefined)},
+                           {buffer,  cuttlefish:conf_get(Prefix ++ ".buffer", Conf, undefined)},
+                           {nodelay, cuttlefish:conf_get(Prefix ++ ".nodelay", Conf, true)}])
+              end,
+    SslOpts = fun(Prefix) ->
+                  Filter([{handshake_timeout, cuttlefish:conf_get(Prefix ++ ".handshake_timeout", Conf)},
+                          {keyfile,    cuttlefish:conf_get(Prefix ++ ".keyfile", Conf, undefined)},
+                          {certfile,   cuttlefish:conf_get(Prefix ++ ".certfile", Conf, undefined)},
+                          {cacertfile, cuttlefish:conf_get(Prefix ++ ".cacertfile", Conf, undefined)},
+                          {verify,     cuttlefish:conf_get(Prefix ++ ".verify_peer", Conf, undefined)},
+                          {failed_if_no_peer_cert, cuttlefish:conf_get(Prefix ++ "failed_if_no_peer_cert", Conf, undefined)}])
+              end,
+
+    Listeners = fun(Name) when is_atom(Name) ->
+                    Key = "mqtt.listener." ++ atom_to_list(Name),
+                    case cuttlefish:conf_get(Key, Conf, undefined) of
+                        undefined ->
+                            [];
+                        Port ->
+                            ConnOpts = Filter([{rate_limit, cuttlefish:conf_get(Key ++ ".rate_limit", Conf, undefined)}]),
+                            Opts = [{connopts, ConnOpts}, {sockopts, TcpOpts(Key)} | LisOpts(Key)],
+                            [{Name, Port, case Name =:= ssl orelse Name =:= https of
+                                              true  -> [{ssl, SslOpts(Key)} | Opts];
+                                              false -> Opts
+                                          end}]
+                   end
+                end,
+    lists:append([Listeners(tcp), Listeners(ssl), Listeners(http), Listeners(https)])
+end}.
+
+%%--------------------------------------------------------------------
+%% MQTT Modules
+%%--------------------------------------------------------------------
+
+{mapping, "mqtt.module.retainer", "emqttd.modules", [
+  {default, on},
+  {datatype, flag}
+]}.
+
+{mapping, "mqtt.module.retainer.storage_type", "emqttd.modules", [
+  {default, ram},
+  {datatype, {enum, [disc, ram]}}
+]}.
+
+{mapping, "mqtt.module.retainer.max_message_num", "emqttd.modules", [
+  {default, 100000},
+  {datatype, integer}
+]}.
+
+{mapping, "mqtt.module.retainer.max_payload_size", "emqttd.modules", [
+  {default, "64KB"},
+  {datatype, bytesize}
+]}.
+
+{mapping, "mqtt.module.retainer.expired_after", "emqttd.modules", [
+  {default, 0},
+  {datatype, integer}
+]}.
+
+{mapping, "mqtt.module.presence", "emqttd.modules", [
+  {default, on},
+  {datatype, flag}
+]}.
+
+{mapping, "mqtt.module.presence.qos", "emqttd.modules", [
+  {default, 0},
+  {datatype, integer},
+  {validators, ["range:0-2"]}
+]}.
+
+{mapping, "mqtt.module.subscription", "emqttd.modules", [
+  {default, off},
+  {datatype, flag}
+]}.
+
+{mapping, "mqtt.module.subscription.topics", "emqttd.modules", [
+  {default, undefined},
+  {datatype, string}
+]}.
+
+{translation, "emqttd.modules", fun(Conf) ->
+    WithMod = fun(Name, OptsF) ->
+                  Key = "mqtt.module." ++ atom_to_list(Name),
+                  case cuttlefish:conf_get(Key, Conf, false) of
+                      true  -> [{Name, OptsF(Key)}];
+                      false -> []
+                  end
+              end,
+    RetainOpts = fun(Prefix) ->
+                     [{storage_type, cuttlefish:conf_get(Prefix ++ ".storage_type", Conf, ram)},
+                      {max_message_num, cuttlefish:conf_get(Prefix ++ ".max_message_num", Conf, undefined)},
+                      {max_payload_size, cuttlefish:conf_get(Prefix ++ ".max_payload_size", Conf, undefined)},
+                      {expired_after, cuttlefish:conf_get(Prefix ++ ".expired_after", Conf, 0)}]
+                 end,
+    PresOpts = fun(Prefix) ->
+                   [{qos, cuttlefish:conf_get(Prefix ++ ".qos", Conf, 0)}]
+               end,
+    ParseFun = fun(undefined) -> [];
+                  (Topics)    -> [begin
+                                      [Topic, Qos] = string:tokens(S, "="),
+                                      {list_to_binary(Topic), list_to_integer(Qos)}
+                                  end || S <- string:tokens(Topics, ",")]
+               end,
+    SubOpts = fun(Prefix) -> ParseFun(cuttlefish:conf_get(Prefix ++ ".topics", Conf)) end,
+    lists:append([WithMod(retainer, RetainOpts), WithMod(presence, PresOpts), WithMod(subscription, SubOpts)])
+end}.
+
+%%--------------------------------------------------------------------
+%% System Monitor
+%%--------------------------------------------------------------------
+
+%% @doc Long GC, don't monitor in production mode for:
+%% https://github.com/erlang/otp/blob/feb45017da36be78d4c5784d758ede619fa7bfd3/erts/emulator/beam/erl_gc.c#L421
+{mapping, "sysmon.long_gc", "emqttd.sysmon", [
+  {default, false},
+  {datatype, {enum, [true, false]}}
+]}.
+
+%% @doc Long Schedule(ms)
+{mapping, "sysmon.long_schedule", "emqttd.sysmon", [
+  {default, 1000},
+  {datatype, integer}
+]}.
+
+%% @doc Large Heap
+{mapping, "sysmon.large_heap", "emqttd.sysmon", [
+  {default, "8MB"},
+  {datatype, bytesize}
+]}.
+
+%% @doc Monitor Busy Port
+{mapping, "sysmon.busy_port", "emqttd.sysmon", [
+  {default, false},
+  {datatype, {enum, [true, false]}}
+]}.
+
+%% @doc Monitor Busy Dist Port
+{mapping, "sysmon.busy_dist_port", "emqttd.sysmon", [
+  {default, true},
+  {datatype, {enum, [true, false]}}
+]}.
+
+{translation, "emqttd.sysmon", fun(Conf) ->
+    [{long_gc, cuttlefish:conf_get("sysmon.long_gc", Conf)},
+     {long_schedule, cuttlefish:conf_get("sysmon.long_schedule", Conf)},
+     {large_heap, cuttlefish:conf_get("sysmon.large_heap", Conf)},
+     {busy_port, cuttlefish:conf_get("sysmon.busy_port", Conf)},
+     {busy_dist_port, cuttlefish:conf_get("sysmon.busy_dist_port", Conf)}]
+end}.
+

Разлика између датотеке није приказан због своје велике величине
+ 1 - 1
rebar.config


+ 0 - 12
src/emqttd.app.src

@@ -1,12 +0,0 @@
-{application, emqttd,
- [
-  {description, "Erlang MQTT Broker"},
-  {vsn, "2.0"},
-  {id, "emqttd"},
-  {modules, []},
-  {registered, []},
-  {applications, [kernel, stdlib, gproc, esockd, mochiweb,
-                  gen_logger, gen_conf]},
-  {mod, {emqttd_app, []}},
-  {env, []}
- ]}.

+ 2 - 9
src/emqttd.erl

@@ -22,7 +22,7 @@
 
 -include("emqttd_protocol.hrl").
 
--export([start/0, conf/1, conf/2, env/1, env/2, is_running/1]).
+-export([start/0, env/1, env/2, is_running/1]).
 
 %% PubSub API
 -export([subscribe/1, subscribe/2, subscribe/3, publish/1,
@@ -57,15 +57,8 @@
 -spec(start() -> ok | {error, any()}).
 start() -> application:start(?APP).
 
-%% @doc Get Config
--spec(conf(Key :: atom()) -> any()).
-conf(Key) -> emqttd_conf:value(Key).
-
--spec(conf(Key :: atom(), Default :: any()) -> any()).
-conf(Key, Default) -> emqttd_conf:value(Key, Default).
-
 %% @doc Environment
--spec(env(Key:: atom()) -> any()).
+-spec(env(Key:: atom()) -> {ok, any()} | undefined).
 env(Key) -> application:get_env(?APP, Key).
 
 %% @doc Get environment

+ 9 - 18
src/emqttd_access_control.erl

@@ -56,7 +56,10 @@ start_link() ->
 auth(Client, Password) when is_record(Client, mqtt_client) ->
     auth(Client, Password, lookup_mods(auth)).
 auth(_Client, _Password, []) ->
-    {error, "No auth module to check!"};
+    case emqttd:env(allow_anonymous, false) of
+        true  -> ok;
+        false -> {error, "No auth module to check!"}
+    end;
 auth(Client, Password, [{Mod, State, _Seq} | Mods]) ->
     case catch Mod:check(Client, Password, State) of
         ok              -> ok;
@@ -73,7 +76,10 @@ auth(Client, Password, [{Mod, State, _Seq} | Mods]) ->
       Topic  :: binary()).
 check_acl(Client, PubSub, Topic) when ?PUBSUB(PubSub) ->
     case lookup_mods(acl) of
-        []      -> allow;
+        []      -> case emqttd:env(allow_anonymous, false) of
+                       true  -> allow;
+                       false -> deny
+                   end;
         AclMods -> check_acl(Client, PubSub, Topic, AclMods)
     end.
 check_acl(#mqtt_client{client_id = ClientId}, PubSub, Topic, []) ->
@@ -120,21 +126,13 @@ tab_key(acl)  -> acl_modules.
 stop() -> gen_server:call(?MODULE, stop).
 
 %%--------------------------------------------------------------------
-%% gen_server callbacks
+%% gen_server Callbacks
 %%--------------------------------------------------------------------
 
 init([]) ->
     ets:new(?ACCESS_CONTROL_TAB, [set, named_table, protected, {read_concurrency, true}]),
-    ets:insert(?ACCESS_CONTROL_TAB, {auth_modules, init_mods(gen_conf:list(emqttd, auth))}),
-    ets:insert(?ACCESS_CONTROL_TAB, {acl_modules,  init_mods(gen_conf:list(emqttd, acl))}),
     {ok, #state{}}.
 
-init_mods(Mods) ->
-    [init_mod(mod_name(Type, Name), Opts) || {Type, Name, Opts} <- Mods].
-
-init_mod(Mod, Opts) ->
-    {ok, State} = Mod:init(Opts), {Mod, State, 0}.
-
 handle_call({register_mod, Type, Mod, Opts, Seq}, _From, State) ->
     Mods = lookup_mods(Type),
     Existed = lists:keyfind(Mod, 1, Mods),
@@ -186,13 +184,6 @@ code_change(_OldVsn, State, _Extra) ->
 %% Internal functions
 %%--------------------------------------------------------------------
 
-mod_name(auth, Name) -> mod(emqttd_auth_, Name);
-
-mod_name(acl, Name)  -> mod(emqttd_acl_, Name).
-    
-mod(Prefix, Name) ->
-    list_to_atom(lists:concat([Prefix, Name])).
-
 if_existed(false, Fun) -> Fun();
 
 if_existed(_Mod, _Fun) -> {error, already_existed}.

+ 0 - 35
src/emqttd_acl_anonymous.erl

@@ -1,35 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
--module(emqttd_acl_anonymous).
-
--behaviour(emqttd_acl_mod).
-
-%% ACL callbacks
--export([init/1, check_acl/2, reload_acl/1, description/0]).
-
-init(Opts) ->
-    {ok, Opts}.
-
-check_acl(_Who, _State) ->
-    allow.
-
-reload_acl(_State) ->
-    ok.
-
-description() ->
-    "Anonymous ACL".
-

+ 6 - 12
src/emqttd_acl_internal.erl

@@ -46,18 +46,12 @@ all_rules() ->
 %%--------------------------------------------------------------------
 
 %% @doc Init internal ACL
--spec(init(Opts :: list()) -> {ok, State :: any()}).
-init(Opts) ->
+-spec(init([File :: string()]) -> {ok, State :: any()}).
+init([File]) ->
     ets:new(?ACL_RULE_TAB, [set, public, named_table, {read_concurrency, true}]),
-    case proplists:get_value(config, Opts) of
-        undefined ->
-            {ok, #state{}};
-        File ->
-            Default = proplists:get_value(nomatch, Opts, allow),
-            State = #state{config = File, nomatch = Default},
-            true = load_rules_from_file(State),
-            {ok, State}
-    end.
+    State = #state{config = File},
+    true = load_rules_from_file(State),
+    {ok, State}.
 
 load_rules_from_file(#state{config = AclFile}) ->
     {ok, Terms} = file:consult(AclFile),
@@ -118,7 +112,7 @@ reload_acl(#state{config = undefined}) ->
 reload_acl(State) ->
     case catch load_rules_from_file(State) of
         {'EXIT', Error} -> {error, Error};
-        _ -> ok
+        true -> ok
     end.
 
 %% @doc ACL Module Description

+ 40 - 20
src/emqttd_app.erl

@@ -1,4 +1,4 @@
-%--------------------------------------------------------------------
+%%--------------------------------------------------------------------
 %% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
 %%
 %% Licensed under the Apache License, Version 2.0 (the "License");
@@ -42,11 +42,11 @@
       Reason    :: term()).
 start(_StartType, _StartArgs) ->
     print_banner(),
-    emqttd_conf:init(),
     emqttd_mnesia:start(),
     {ok, Sup} = emqttd_sup:start_link(),
     start_servers(Sup),
     emqttd_cli:load(),
+    register_acl_mod(),
     load_all_mods(),
     emqttd_plugins:init(),
     emqttd_plugins:load(),
@@ -141,15 +141,25 @@ worker_spec(Module, Opts) when is_atom(Module) ->
 worker_spec(M, F, A) ->
     {M, {M, F, A}, permanent, 10000, worker, [M]}.
 
+%%--------------------------------------------------------------------
+%% Register default ACL File
+%%--------------------------------------------------------------------
+
+register_acl_mod() ->
+    case emqttd:env(acl_file) of
+        {ok, File} -> emqttd_access_control:register_mod(acl, emqttd_acl_internal, [File]);
+        undefined  -> ok
+    end.
+
 %%--------------------------------------------------------------------
 %% Load Modules
 %%--------------------------------------------------------------------
 
-%% @doc load all modules
+%% @doc Load all modules
 load_all_mods() ->
-    lists:foreach(fun load_mod/1, gen_conf:list(emqttd, module)).
+    lists:foreach(fun load_mod/1, emqttd:env(modules, [])).
 
-load_mod({module, Name, Opts}) ->
+load_mod({Name, Opts}) ->
     Mod = list_to_atom("emqttd_mod_" ++ atom_to_list(Name)),
     case catch Mod:load(Opts) of
         ok               -> lager:info("Load module ~s successfully", [Name]);
@@ -159,7 +169,7 @@ load_mod({module, Name, Opts}) ->
 
 %% @doc Is module enabled?
 -spec(is_mod_enabled(Name :: atom()) -> boolean()).
-is_mod_enabled(Name) -> lists:keyfind(Name, 2, gen_conf:list(emqttd, module)).
+is_mod_enabled(Name) -> lists:keyfind(Name, 1, emqttd:env(modules, [])).
 
 %%--------------------------------------------------------------------
 %% Start Listeners
@@ -167,28 +177,29 @@ is_mod_enabled(Name) -> lists:keyfind(Name, 2, gen_conf:list(emqttd, module)).
 
 %% @doc Start Listeners of the broker.
 -spec(start_listeners() -> any()).
-start_listeners() -> lists:foreach(fun start_listener/1, gen_conf:list(emqttd, listener)).
+start_listeners() -> lists:foreach(fun start_listener/1, emqttd:env(listeners, [])).
 
 %% Start mqtt listener
 -spec(start_listener(listener()) -> any()).
-start_listener({listener, mqtt, ListenOn, Opts}) ->
-    start_listener(mqtt, ListenOn, Opts);
+start_listener({tcp, ListenOn, Opts}) ->
+    start_listener('mqtt:tcp', ListenOn, Opts);
 
 %% Start mqtt(SSL) listener
-start_listener({listener, mqtts, ListenOn, Opts}) ->
-    start_listener(mqtts, ListenOn, Opts);
+start_listener({ssl, ListenOn, Opts}) ->
+    start_listener('mqtt:ssl', ListenOn, Opts);
 
 %% Start http listener
-start_listener({listener, http, ListenOn, Opts}) ->
-    mochiweb:start_http(http, ListenOn, Opts, {emqttd_http, handle_request, []});
+start_listener({Proto, ListenOn, Opts}) when Proto == http; Proto == ws ->
+    mochiweb:start_http('mqtt:ws', ListenOn, Opts, {emqttd_http, handle_request, []});
 
 %% Start https listener
-start_listener({listener, https, ListenOn, Opts}) ->
-    mochiweb:start_http(https, ListenOn, Opts, {emqttd_http, handle_request, []}).
+start_listener({Proto, ListenOn, Opts}) when Proto == https; Proto == wss ->
+    mochiweb:start_http('mqtt:wss', ListenOn, Opts, {emqttd_http, handle_request, []}).
 
-start_listener(Protocol, ListenOn, Opts) ->
-    MFArgs = {emqttd_client, start_link, [emqttd_conf:mqtt()]},
-    {ok, _} = esockd:open(Protocol, ListenOn, merge_sockopts(Opts), MFArgs).
+start_listener(Proto, ListenOn, Opts) ->
+    {ok, Env} = emqttd:env(protocol),
+    MFArgs = {emqttd_client, start_link, [Env]},
+    {ok, _} = esockd:open(Proto, ListenOn, merge_sockopts(Opts), MFArgs).
 
 merge_sockopts(Options) ->
     SockOpts = emqttd_opts:merge(?MQTT_SOCKOPTS,
@@ -200,10 +211,19 @@ merge_sockopts(Options) ->
 %%--------------------------------------------------------------------
 
 %% @doc Stop Listeners
-stop_listeners() -> lists:foreach(fun stop_listener/1, gen_conf:list(listener)).
+stop_listeners() -> lists:foreach(fun stop_listener/1, emqttd:env(listeners, [])).
 
 %% @private
-stop_listener({listener, Protocol, ListenOn, _Opts}) -> esockd:close(Protocol, ListenOn).
+stop_listener({tcp, ListenOn, _Opts}) ->
+    esockd:close('mqtt:tcp', ListenOn);
+stop_listener({ssl, ListenOn, _Opts}) ->
+    esockd:close('mqtt:ssl', ListenOn);
+stop_listener({Proto, ListenOn, _Opts}) when Proto == http; Proto == ws ->
+    mochiweb:stop_http('mqtt:ws', ListenOn);
+stop_listener({Proto, ListenOn, _Opts}) when Proto == https; Proto == wss ->
+    mochiweb:stop_http('mqtt:wss', ListenOn);
+stop_listener({Proto, ListenOn, _Opts}) ->
+    esockd:close(Proto, ListenOn).
 
 -ifdef(TEST).
 -include_lib("eunit/include/eunit.hrl").

+ 0 - 29
src/emqttd_auth_anonymous.erl

@@ -1,29 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
-%% @doc Anonymous Authentication Module
--module(emqttd_auth_anonymous).
-
--behaviour(emqttd_auth_mod).
-
--export([init/1, check/3, description/0]).
-
-init(Opts) -> {ok, Opts}.
-
-check(_Client, _Password, _Opts) -> ok.
-
-description() -> "Anonymous Authentication Module".
-

+ 0 - 123
src/emqttd_auth_clientid.erl

@@ -1,123 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
--module(emqttd_auth_clientid).
-
--include("emqttd.hrl").
-
--export([add_clientid/1, add_clientid/2, lookup_clientid/1, remove_clientid/1,
-         all_clientids/0]).
-
--behaviour(emqttd_auth_mod).
-
-%% emqttd_auth_mod callbacks
--export([init/1, check/3, description/0]).
-
--define(AUTH_CLIENTID_TAB, mqtt_auth_clientid).
-
--record(?AUTH_CLIENTID_TAB, {client_id, ipaddr, password}).
-
-%%--------------------------------------------------------------------
-%% API
-%%--------------------------------------------------------------------
-
-%% @doc Add clientid
--spec(add_clientid(binary()) -> {atomic, ok} | {aborted, any()}).
-add_clientid(ClientId) when is_binary(ClientId) ->
-    R = #mqtt_auth_clientid{client_id = ClientId},
-    mnesia:transaction(fun mnesia:write/1, [R]).
-
-%% @doc Add clientid with password
--spec(add_clientid(binary(), binary()) -> {atomic, ok} | {aborted, any()}).
-add_clientid(ClientId, Password) ->
-    R = #mqtt_auth_clientid{client_id = ClientId, password = Password},
-    mnesia:transaction(fun mnesia:write/1, [R]).
-
-%% @doc Lookup clientid
--spec(lookup_clientid(binary()) -> list(#mqtt_auth_clientid{})).
-lookup_clientid(ClientId) ->
-    mnesia:dirty_read(?AUTH_CLIENTID_TAB, ClientId).
-
-%% @doc Lookup all clientids
--spec(all_clientids() -> list(binary())).
-all_clientids() -> mnesia:dirty_all_keys(?AUTH_CLIENTID_TAB).
-
-%% @doc Remove clientid
--spec(remove_clientid(binary()) -> {atomic, ok} | {aborted, any()}).
-remove_clientid(ClientId) ->
-    mnesia:transaction(fun mnesia:delete/1, [{?AUTH_CLIENTID_TAB, ClientId}]).
-
-%%--------------------------------------------------------------------
-%% emqttd_auth_mod callbacks
-%%--------------------------------------------------------------------
-
-init(Opts) ->
-    mnesia:create_table(?AUTH_CLIENTID_TAB, [
-            {ram_copies, [node()]},
-            {attributes, record_info(fields, ?AUTH_CLIENTID_TAB)}]),
-    mnesia:add_table_copy(?AUTH_CLIENTID_TAB, node(), ram_copies),
-    Clients = load_client_from(proplists:get_value(config, Opts)),
-    mnesia:transaction(fun() -> [mnesia:write(C) || C<- Clients] end),
-    {ok, Opts}.
-
-check(#mqtt_client{client_id = undefined}, _Password, _Opts) ->
-    {error, clientid_undefined};
-check(#mqtt_client{client_id = ClientId, peername = {IpAddress, _}}, _Password, []) ->
-    check_clientid_only(ClientId, IpAddress);
-check(#mqtt_client{client_id = ClientId, peername = {IpAddress, _}}, _Password, [{password, no}|_]) ->
-    check_clientid_only(ClientId, IpAddress);
-check(_Client, undefined, [{password, yes}|_]) ->
-    {error, password_undefined};
-check(#mqtt_client{client_id = ClientId}, Password, [{password, yes}|_]) ->
-    case mnesia:dirty_read(?AUTH_CLIENTID_TAB, ClientId) of
-        [] -> {error, clientid_not_found};
-        [#?AUTH_CLIENTID_TAB{password = Password}]  -> ok; %% TODO: plaintext??
-        _ -> {error, password_error}
-    end.
-
-description() -> "ClientId authentication module".
-
-%%--------------------------------------------------------------------
-%% Internal functions
-%%--------------------------------------------------------------------
-
-load_client_from(undefined) ->
-    ok;
-
-load_client_from(File) ->
-    {ok, Clients} = file:consult(File),
-    [client(Client) || Client <- Clients].
-
-client(ClientId) when is_list(ClientId) ->
-    #mqtt_auth_clientid{client_id = list_to_binary(ClientId)};
-
-client({ClientId, IpAddr}) when is_list(ClientId) ->
-    #mqtt_auth_clientid{client_id = iolist_to_binary(ClientId),
-                        ipaddr    = esockd_cidr:parse(IpAddr, true)}.
-
-check_clientid_only(ClientId, IpAddr) ->
-    case mnesia:dirty_read(?AUTH_CLIENTID_TAB, ClientId) of
-        [] ->
-            {error, clientid_not_found};
-        [#?AUTH_CLIENTID_TAB{ipaddr = undefined}] ->
-            ok;
-        [#?AUTH_CLIENTID_TAB{ipaddr = CIDR}] ->
-            case esockd_cidr:match(IpAddr, CIDR) of
-                true  -> ok;
-                false -> {error, wrong_ipaddr}
-            end
-    end.
-

+ 0 - 164
src/emqttd_auth_username.erl

@@ -1,164 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
-%% @doc Authentication with username and password
--module(emqttd_auth_username).
-
--include("emqttd.hrl").
-
--include("emqttd_cli.hrl").
-
-%% CLI callbacks
--export([cli/1]).
-
--behaviour(emqttd_auth_mod).
-
--export([is_enabled/0]).
-
--export([add_user/2, remove_user/1, lookup_user/1, all_users/0]).
-
-%% emqttd_auth callbacks
--export([init/1, check/3, description/0]).
-
--define(AUTH_USERNAME_TAB, mqtt_auth_username).
-
--record(?AUTH_USERNAME_TAB, {username, password}).
-
-%%--------------------------------------------------------------------
-%% CLI
-%%--------------------------------------------------------------------
-cli(["list"]) ->
-    if_enabled(fun() ->
-        Usernames = mnesia:dirty_all_keys(?AUTH_USERNAME_TAB),
-        [?PRINT("~s~n", [Username]) || Username <- Usernames]
-    end);
-
-cli(["add", Username, Password]) ->
-    if_enabled(fun() ->
-        ?PRINT("~p~n", [add_user(iolist_to_binary(Username), iolist_to_binary(Password))])
-    end);
-
-cli(["del", Username]) ->
-    if_enabled(fun() ->
-        ?PRINT("~p~n", [remove_user(iolist_to_binary(Username))])
-    end);
-
-cli(_) ->
-    ?USAGE([{"users list", "List users"},
-            {"users add <Username> <Password>", "Add User"},
-            {"users del <Username>", "Delete User"}]).
-
-if_enabled(Fun) ->
-    case is_enabled() of
-        true  -> Fun();
-        false -> hint()
-    end.
-
-hint() ->
-    ?PRINT_MSG("Please enable '{auth, username, []}' in etc/emqttd.conf first.~n").
-
-%%--------------------------------------------------------------------
-%% API
-%%--------------------------------------------------------------------
-
-is_enabled() ->
-    lists:member(?AUTH_USERNAME_TAB, mnesia:system_info(tables)).
-
-%% @doc Add User
--spec(add_user(binary(), binary()) -> ok | {error, any()}).
-add_user(Username, Password) ->
-    User = #?AUTH_USERNAME_TAB{username = Username, password = hash(Password)},
-    ret(mnesia:transaction(fun insert_user/1, [User])).
-
-insert_user(User = #?AUTH_USERNAME_TAB{username = Username}) ->
-    case mnesia:read(?AUTH_USERNAME_TAB, Username) of
-        []    -> mnesia:write(User);
-        [_|_] -> mnesia:abort(existed)
-    end.
-
-add_default_user(Username, Password) when is_atom(Username) ->
-    add_default_user(atom_to_list(Username), Password);
-
-add_default_user(Username, Password) ->
-    add_user(iolist_to_binary(Username), iolist_to_binary(Password)).
-
-%% @doc Lookup user by username
--spec(lookup_user(binary()) -> list()).
-lookup_user(Username) ->
-    mnesia:dirty_read(?AUTH_USERNAME_TAB, Username).
-
-%% @doc Remove user
--spec(remove_user(binary()) -> ok | {error, any()}).
-remove_user(Username) ->
-    ret(mnesia:transaction(fun mnesia:delete/1, [{?AUTH_USERNAME_TAB, Username}])).
-
-ret({atomic, ok})     -> ok;
-ret({aborted, Error}) -> {error, Error}.
-
-%% @doc All usernames
--spec(all_users() -> list()).
-all_users() -> mnesia:dirty_all_keys(?AUTH_USERNAME_TAB).
-
-%%--------------------------------------------------------------------
-%% emqttd_auth_mod callbacks
-%%--------------------------------------------------------------------
-
-init(Opts) ->
-    mnesia:create_table(?AUTH_USERNAME_TAB, [
-            {disc_copies, [node()]},
-            {attributes, record_info(fields, ?AUTH_USERNAME_TAB)}]),
-    mnesia:add_table_copy(?AUTH_USERNAME_TAB, node(), disc_copies),
-    case proplists:get_value(passwd, Opts) of
-        undefined -> ok;
-        File -> {ok, DefaultUsers} = file:consult(File),
-                lists:foreach(fun({Username, Password}) ->
-                        add_default_user(Username, Password)
-                end, DefaultUsers)
-    end,
-    emqttd_ctl:register_cmd(users, {?MODULE, cli}, []),
-    {ok, Opts}.
-
-check(#mqtt_client{username = undefined}, _Password, _Opts) ->
-    {error, username_undefined};
-check(_User, undefined, _Opts) ->
-    {error, password_undefined};
-check(#mqtt_client{username = Username}, Password, _Opts) ->
-    case mnesia:dirty_read(?AUTH_USERNAME_TAB, Username) of
-        [] ->
-            {error, username_not_found};
-        [#?AUTH_USERNAME_TAB{password = <<Salt:4/binary, Hash/binary>>}] ->
-            case Hash =:= md5_hash(Salt, Password) of
-                true -> ok;
-                false -> {error, password_error}
-            end
-    end.
-
-description() ->
-    "Username password authentication module".
-
-%%--------------------------------------------------------------------
-%% Internal functions
-%%--------------------------------------------------------------------
-
-hash(Password) ->
-    SaltBin = salt(), <<SaltBin/binary, (md5_hash(SaltBin, Password))/binary>>.
-
-md5_hash(SaltBin, Password) ->
-    erlang:md5(<<SaltBin/binary, Password/binary>>).
-
-salt() ->
-    emqttd_time:seed(), Salt = rand:uniform(16#ffffffff), <<Salt:32>>.
-

+ 2 - 1
src/emqttd_bridge_sup_sup.erl

@@ -46,7 +46,8 @@ start_bridge(Node, Topic) when is_atom(Node) andalso is_binary(Topic) ->
 start_bridge(Node, _Topic, _Options) when Node =:= node() ->
     {error, bridge_to_self};
 start_bridge(Node, Topic, Options) when is_atom(Node) andalso is_binary(Topic) ->
-    Options1 = emqttd_opts:merge(emqttd_conf:bridge(), Options),
+    {ok, BridgeEnv} = emqttd:env(bridge),
+    Options1 = emqttd_opts:merge(BridgeEnv, Options),
     supervisor:start_child(?MODULE, bridge_spec(Node, Topic, Options1)).
 
 %% @doc Stop a bridge

+ 1 - 1
src/emqttd_broker.erl

@@ -95,7 +95,7 @@ datetime() ->
 
 %% @doc Start a tick timer
 start_tick(Msg) ->
-    start_tick(timer:seconds(emqttd:conf(broker_sys_interval, 60)), Msg).
+    start_tick(timer:seconds(emqttd:env(broker_sys_interval, 60)), Msg).
 
 start_tick(0, _Msg) ->
     undefined;

+ 34 - 38
src/emqttd_cli.erl

@@ -143,7 +143,7 @@ cluster(_) ->
 
 %%--------------------------------------------------------------------
 %% @doc Users usage
-users(Args) -> emqttd_auth_username:cli(Args).
+users(Args) -> emq_auth_username:cli(Args).
 
 %%--------------------------------------------------------------------
 %% @doc Query clients
@@ -174,11 +174,11 @@ sessions(["list"]) ->
 
 %% performance issue?
 sessions(["list", "persistent"]) ->
-    lists:foreach(fun print/1, ets:match_object(mqtt_local_session, {'_', false, '_', '_'}));
+    lists:foreach(fun print/1, ets:match_object(mqtt_local_session, {'_', '_', false, '_'}));
 
 %% performance issue?
 sessions(["list", "transient"]) ->
-    lists:foreach(fun print/1, ets:match_object(mqtt_local_session, {'_', true, '_', '_'}));
+    lists:foreach(fun print/1, ets:match_object(mqtt_local_session, {'_', '_', true, '_'}));
 
 sessions(["show", ClientId]) ->
     case ets:lookup(mqtt_local_session, bin(ClientId)) of
@@ -219,7 +219,7 @@ topics(_) ->
 subscriptions(["list"]) ->
     lists:foreach(fun(Subscription) ->
                       print(subscription, Subscription)
-                  end, []); %%emqttd:subscriptions());
+                  end, ets:tab2list(mqtt_subscription));
 
 subscriptions(["show", ClientId]) ->
     case ets:lookup(mqtt_subscription, bin(ClientId)) of
@@ -227,35 +227,30 @@ subscriptions(["show", ClientId]) ->
         Records -> [print(subscription, Subscription) || Subscription <- Records]
     end;
 
-%%
-%% subscriptions(["add", ClientId, Topic, QoS]) ->
-%%    Add = fun(IntQos) ->
-%%            Subscription = #mqtt_subscription{subid = bin(ClientId),
-%%                                              topic = bin(Topic),
-%%                                              qos   = IntQos},
-%%            case emqttd_backend:add_subscription(Subscription) of
-%%                ok ->
-%%                    ?PRINT_MSG("ok~n");
-%%                {error, already_existed} ->
-%%                    ?PRINT_MSG("Error: already existed~n");
-%%                {error, Reason} ->
-%%                    ?PRINT("Error: ~p~n", [Reason])
-%%            end
-%%          end,
-%%    if_valid_qos(QoS, Add);
-%%
 
-%%
-%% subscriptions(["del", ClientId]) ->
-%%    Ok = emqttd_backend:del_subscriptions(bin(ClientId)),
-%%    ?PRINT("~p~n", [Ok]);
-%%
+subscriptions(["add", ClientId, Topic, QoS]) ->
+   Add = fun(IntQos) ->
+           case emqttd:subscribe(bin(Topic), bin(ClientId), [{qos, IntQos}]) of
+               ok ->
+                   ?PRINT_MSG("ok~n");
+               {error, already_existed} ->
+                   ?PRINT_MSG("Error: already existed~n");
+               {error, Reason} ->
+                   ?PRINT("Error: ~p~n", [Reason])
+           end
+         end,
+   if_valid_qos(QoS, Add);
+
+
+
+subscriptions(["del", ClientId]) ->
+   Ok = emqttd:subscriber_down(bin(ClientId)),
+   ?PRINT("~p~n", [Ok]);
+
+subscriptions(["del", ClientId, Topic]) ->
+   Ok = emqttd:unsubscribe(bin(Topic), bin(ClientId)),
+   ?PRINT("~p~n", [Ok]);
 
-%%
-%% subscriptions(["del", ClientId, Topic]) ->
-%%    Ok = emqttd_backend:del_subscription(bin(ClientId), bin(Topic)),
-%%    ?PRINT("~p~n", [Ok]);
-%%
 
 subscriptions(_) ->
     ?USAGE([{"subscriptions list",                         "List all subscriptions"},
@@ -515,8 +510,9 @@ print(#mqtt_client{client_id = ClientId, clean_sess = CleanSess, username = User
 print(#mqtt_route{topic = Topic, node = Node}) ->
     ?PRINT("~s -> ~s~n", [Topic, Node]);
 
-print({ClientId, _ClientPid, CleanSess, SessInfo}) ->
-    InfoKeys = [max_inflight,
+print({ClientId, _ClientPid, _Persistent, SessInfo}) ->
+    InfoKeys = [clean_sess,
+                max_inflight,
                 inflight_queue,
                 message_queue,
                 message_dropped,
@@ -528,12 +524,12 @@ print({ClientId, _ClientPid, CleanSess, SessInfo}) ->
            "message_queue=~w, message_dropped=~w, "
            "awaiting_rel=~w, awaiting_ack=~w, awaiting_comp=~w, "
            "created_at=~w)~n",
-            [ClientId, CleanSess | [format(Key, get_value(Key, SessInfo)) || Key <- InfoKeys]]).
+            [ClientId | [format(Key, get_value(Key, SessInfo)) || Key <- InfoKeys]]).
 
-print(subscription, {Sub, Topic, Opts}) when is_pid(Sub) ->
-    ?PRINT("~p -> ~s: ~p~n", [Sub, Topic, Opts]);
-print(subscription, {Sub, Topic, Opts}) ->
-    ?PRINT("~s -> ~s: ~p~n", [Sub, Topic, Opts]).
+print(subscription, {Sub, Topic}) when is_pid(Sub) ->
+    ?PRINT("~p -> ~s~n", [Sub, Topic]);
+print(subscription, {Sub, Topic}) ->
+    ?PRINT("~s -> ~s~n", [Sub, Topic]).
 
 format(created_at, Val) ->
     emqttd_time:now_to_secs(Val);

+ 0 - 112
src/emqttd_conf.erl

@@ -1,112 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
--module(emqttd_conf).
-
--export([init/0]).
-
--export([mqtt/0, session/0, queue/0, bridge/0, pubsub/0]).
-
--export([value/1, value/2, list/1]).
-
--define(APP, emqttd).
-
-init() -> gen_conf:init(?APP).
-
-mqtt() ->
-    with_env(mqtt_protocol, [
-        %% Max ClientId Length Allowed.
-        {max_clientid_len,    value(mqtt_max_clientid_len, 512)},
-        %% Max Packet Size Allowed, 64K by default.
-        {max_packet_size,     value(mqtt_max_packet_size, 65536)},
-        %% Client Idle Timeout.
-        {client_idle_timeout, value(mqtt_client_idle_timeout, 30)}
-    ]).
-
-session() ->
-    with_env(mqtt_session, [
-        %% Max number of QoS 1 and 2 messages that can be “inflight” at one time.
-        %% 0 means no limit
-        {max_inflight,         value(session_max_inflight, 100)},
-
-        %% Retry interval for redelivering QoS1/2 messages.
-        {unack_retry_interval, value(session_unack_retry_interval, 60)},
-
-        %% Awaiting PUBREL Timeout
-        {await_rel_timeout,    value(session_await_rel_timeout, 20)},
-
-        %% Max Packets that Awaiting PUBREL, 0 means no limit
-        {max_awaiting_rel,     value(session_max_awaiting_rel, 0)},
-
-        %% Statistics Collection Interval(seconds)
-        {collect_interval,     value(session_collect_interval, 0)},
-
-        %% Expired after 2 day (unit: minute)
-        {expired_after,        value(session_expired_after, 2880)}
-    ]).
-
-queue() ->
-    with_env(mqtt_queue, [
-        %% Type: simple | priority
-        {type,             value(queue_type, simple)},
-
-        %% Topic Priority: 0~255, Default is 0
-        {priority,         value(queue_priority, [])},
-
-        %% Max queue length. Enqueued messages when persistent client disconnected,
-        %% or inflight window is full.
-        {max_length,       value(queue_max_length, infinity)},
-
-        %% Low-water mark of queued messages
-        {low_watermark,    value(queue_low_watermark, 0.2)},
-
-        %% High-water mark of queued messages
-        {high_watermark,   value(queue_high_watermark, 0.6)},
-
-        %% Queue Qos0 messages?
-        {queue_qos0,       value(queue_qos0, true)}
-    ]).
-
-bridge() ->
-    with_env(mqtt_bridge, [
-        {max_queue_len,      value(bridge_max_queue_len, 10000)},
-
-        %% Ping Interval of bridge node
-        {ping_down_interval, value(bridge_ping_down_interval, 1)}
-    ]).
-
-pubsub() ->
-    with_env(mqtt_pubsub, [
-        %% PubSub and Router. Default should be scheduler numbers.
-        {pool_size, value(pubsub_pool_size, 8)}
-    ]).
-
-value(Key) ->
-    with_env(Key, gen_conf:value(?APP, Key)).
-
-value(Key, Default) ->
-    with_env(Key, gen_conf:value(?APP, Key, Default)).
-
-with_env(Key, Conf) ->
-    case application:get_env(?APP, Key) of
-        undefined ->
-            application:set_env(?APP, Key, Conf), Conf;
-        {ok, Val} ->
-            Val
-    end.
-
-list(Key) -> gen_conf:list(?APP, Key).
-

+ 0 - 115
src/emqttd_mod_rewrite.erl

@@ -1,115 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
--module(emqttd_mod_rewrite).
-
--behaviour(emqttd_gen_mod).
-
--include("emqttd.hrl").
-
--export([load/1, reload/1, unload/1]).
-
--export([rewrite_subscribe/4, rewrite_unsubscribe/4, rewrite_publish/2]).
-
-%%--------------------------------------------------------------------
-%% API
-%%--------------------------------------------------------------------
-
-load(Opts) ->
-    case proplists:get_value(config, Opts) of
-        undefined ->
-            ok;
-        File ->
-            {ok, Terms} = file:consult(File), Sections = compile(Terms),
-            emqttd:hook('client.subscribe', fun ?MODULE:rewrite_subscribe/4, [Sections]),
-            emqttd:hook('client.unsubscribe', fun ?MODULE:rewrite_unsubscribe/4, [Sections]),
-            emqttd:hook('message.publish', fun ?MODULE:rewrite_publish/2, [Sections])
-    end.
-
-rewrite_subscribe(_ClientId, _Username, TopicTable, Sections) ->
-    lager:info("Rewrite subscribe: ~p", [TopicTable]),
-    {ok, [{match_topic(Topic, Sections), Opts} || {Topic, Opts} <- TopicTable]}.
-
-rewrite_unsubscribe(_ClientId, _Username, TopicTable, Sections) ->
-    lager:info("Rewrite unsubscribe: ~p", [TopicTable]),
-    {ok, [{match_topic(Topic, Sections), Opts} || {Topic, Opts} <- TopicTable]}.
-
-rewrite_publish(Message=#mqtt_message{topic = Topic}, Sections) ->
-    %%TODO: this will not work if the client is always online.
-    RewriteTopic =
-    case get({rewrite, Topic}) of
-        undefined ->
-            DestTopic = match_topic(Topic, Sections),
-            put({rewrite, Topic}, DestTopic), DestTopic;
-        DestTopic ->
-            DestTopic
-        end,
-    {ok, Message#mqtt_message{topic = RewriteTopic}}.
-
-reload(File) ->
-    %%TODO: The unload api is not right...
-    case emqttd_app:is_mod_enabled(rewrite) of
-        true -> 
-            unload(state),
-            load([{file, File}]);
-        false ->
-            {error, module_unloaded}
-    end.
-            
-unload(_) ->
-    emqttd:unhook('client.subscribe',  fun ?MODULE:rewrite_subscribe/4),
-    emqttd:unhook('client.unsubscribe',fun ?MODULE:rewrite_unsubscribe/4),
-    emqttd:unhook('message.publish',   fun ?MODULE:rewrite_publish/2).
-
-%%--------------------------------------------------------------------
-%% Internal functions
-%%--------------------------------------------------------------------
-
-compile(Sections) ->
-    C = fun({rewrite, Re, Dest}) ->
-       {ok, MP} = re:compile(Re),
-       {rewrite, MP, Dest}
-    end,
-    F = fun({topic, Topic, Rules}) ->
-        {topic, list_to_binary(Topic), [C(R) || R <- Rules]}
-    end,
-    [F(Section) || Section <- Sections].
-
-match_topic(Topic, []) ->
-    Topic;
-match_topic(Topic, [{topic, Filter, Rules} | Sections]) ->
-    case emqttd_topic:match(Topic, Filter) of
-        true ->
-            match_rule(Topic, Rules);
-        false ->
-            match_topic(Topic, Sections)
-    end.
-
-match_rule(Topic, []) ->
-    Topic;
-match_rule(Topic, [{rewrite, MP, Dest} | Rules]) ->
-    case re:run(Topic, MP, [{capture, all_but_first, list}]) of
-        {match, Captured} ->
-            Vars = lists:zip(["\\$" ++ integer_to_list(I)
-                                || I <- lists:seq(1, length(Captured))], Captured),
-            iolist_to_binary(lists:foldl(
-                    fun({Var, Val}, Acc) ->
-                            re:replace(Acc, Var, Val, [global])
-                    end, Dest, Vars));
-        nomatch ->
-            match_rule(Topic, Rules)
-    end.
-

+ 20 - 14
src/emqttd_plugins.erl

@@ -26,23 +26,28 @@
 
 -export([list/0]).
 
+%% @doc Init plugins' config
+-spec(init() -> ok).
 init() ->
-    case emqttd:conf(plugins_etc_dir) of
+    case emqttd:env(plugins_etc_dir) of
         {ok, PluginsEtc} ->
-            CfgFiles = filelib:wildcard("*.conf", PluginsEtc),
-            lists:foreach(fun(CfgFile) ->
-                App = app_name(CfgFile),
-                application:set_env(App, conf, filename:join(PluginsEtc, CfgFile)),
-                gen_conf:init(App)
-            end, CfgFiles);
+            CfgFiles = [filename:join(PluginsEtc, File) ||
+                          File <- filelib:wildcard("*.config", PluginsEtc)],
+            lists:foreach(fun init_config/1, CfgFiles);
         undefined ->
             ok
     end.
 
+init_config(CfgFile) ->
+    {ok, [AppsEnv]} = file:consult(CfgFile),
+    lists:foreach(fun({AppName, Envs}) ->
+                      [application:set_env(AppName, Par, Val) || {Par, Val} <- Envs]
+                  end, AppsEnv).
+
 %% @doc Load all plugins when the broker started.
 -spec(load() -> list() | {error, any()}).
 load() ->
-    case emqttd:conf(plugins_loaded_file) of
+    case emqttd:env(plugins_loaded_file) of
         {ok, File} ->
             ensure_file(File),
             with_loaded_file(File, fun(Names) -> load_plugins(Names, false) end);
@@ -75,7 +80,7 @@ load_plugins(Names, Persistent) ->
 %% @doc Unload all plugins before broker stopped.
 -spec(unload() -> list() | {error, any()}).
 unload() ->
-    case emqttd:conf(plugins_loaded_file) of
+    case emqttd:env(plugins_loaded_file) of
         {ok, File} ->
             with_loaded_file(File, fun stop_plugins/1);
         undefined ->
@@ -89,9 +94,9 @@ stop_plugins(Names) ->
 %% @doc List all available plugins
 -spec(list() -> [mqtt_plugin()]).
 list() ->
-    case emqttd:conf(plugins_etc_dir) of
-        {ok, PluginsEtc} -> 
-            CfgFiles = filelib:wildcard("*.conf", PluginsEtc),
+    case emqttd:env(plugins_etc_dir) of
+        {ok, PluginsEtc} ->
+            CfgFiles = filelib:wildcard("*.{conf,config}", PluginsEtc),
             Plugins = [plugin(CfgFile) || CfgFile <- CfgFiles],
             StartedApps = names(started_app),
             lists:map(fun(Plugin = #mqtt_plugin{name = Name}) ->
@@ -244,7 +249,7 @@ plugin_unloaded(Name, true) ->
     end.
 
 read_loaded() ->
-    case emqttd:conf(plugins_loaded_file) of
+    case emqttd:env(plugins_loaded_file) of
         {ok, File} -> read_loaded(File);
         undefined  -> {error, not_found}
     end.
@@ -252,7 +257,7 @@ read_loaded() ->
 read_loaded(File) -> file:consult(File).
 
 write_loaded(AppNames) ->
-    {ok, File} = emqttd:conf(plugins_loaded_file),
+    {ok, File} = emqttd:env(plugins_loaded_file),
     case file:open(File, [binary, write]) of
         {ok, Fd} ->
             lists:foreach(fun(Name) ->
@@ -262,3 +267,4 @@ write_loaded(AppNames) ->
             lager:error("Open File ~p Error: ~p", [File, Error]),
             {error, Error}
     end.
+

+ 1 - 1
src/emqttd_protocol.erl

@@ -448,7 +448,7 @@ authenticate(Client, Password) ->
 
 %% PUBLISH ACL is cached in process dictionary.
 check_acl(publish, Topic, Client) ->
-    IfCache = emqttd:conf(cache_acl, true),
+    IfCache = emqttd:env(cache_acl, true),
     case {IfCache, get({acl, publish, Topic})} of
         {true, undefined} ->
             AllowDeny = emqttd_access_control:check_acl(Client, publish, Topic),

+ 3 - 3
src/emqttd_pubsub_sup.erl

@@ -32,7 +32,7 @@
 %%--------------------------------------------------------------------
 
 start_link() ->
-    supervisor:start_link({local, ?MODULE}, ?MODULE, [emqttd_conf:pubsub()]).
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
 
 pubsub_pool() ->
     hd([Pid || {pubsub_pool, Pid, _, _} <- supervisor:which_children(?MODULE)]).
@@ -41,10 +41,10 @@ pubsub_pool() ->
 %% Supervisor Callbacks
 %%--------------------------------------------------------------------
 
-init([Env]) ->
+init([]) ->
+    {ok, Env} = emqttd:env(pubsub),
     %% Create ETS Tables
     [create_tab(Tab) || Tab <- [mqtt_subproperty, mqtt_subscriber, mqtt_subscription]],
-
     {ok, { {one_for_all, 10, 3600}, [pool_sup(pubsub, Env), pool_sup(server, Env)]} }.
 
 %%--------------------------------------------------------------------

+ 6 - 5
src/emqttd_session.erl

@@ -214,8 +214,9 @@ unsubscribe(SessPid, TopicTable) ->
 
 init([CleanSess, {ClientId, Username}, ClientPid]) ->
     process_flag(trap_exit, true),
-    true    = link(ClientPid),
-    SessEnv = emqttd_conf:session(),
+    true = link(ClientPid),
+    {ok, QEnv} = emqttd:env(queue),
+    {ok, SessEnv} = emqttd:env(session),
     Session = #session{
             clean_sess        = CleanSess,
             client_id         = ClientId,
@@ -224,14 +225,14 @@ init([CleanSess, {ClientId, Username}, ClientPid]) ->
             subscriptions     = #{},
             inflight_queue    = [],
             max_inflight      = get_value(max_inflight, SessEnv, 0),
-            message_queue     = emqttd_mqueue:new(ClientId, emqttd_conf:queue(), emqttd_alarm:alarm_fun()),
+            message_queue     = emqttd_mqueue:new(ClientId, QEnv, emqttd_alarm:alarm_fun()),
             awaiting_rel      = #{},
             awaiting_ack      = #{},
             awaiting_comp     = #{},
-            retry_interval    = get_value(unack_retry_interval, SessEnv),
+            retry_interval    = get_value(retry_interval, SessEnv),
             await_rel_timeout = get_value(await_rel_timeout, SessEnv),
             max_awaiting_rel  = get_value(max_awaiting_rel, SessEnv),
-            expired_after     = get_value(expired_after, SessEnv) * 60,
+            expired_after     = get_value(expired_after, SessEnv),
             collect_interval  = get_value(collect_interval, SessEnv, 0),
             timestamp         = os:timestamp()},
     emqttd_sm:reg_session(ClientId, CleanSess, sess_info(Session)),

+ 2 - 9
src/emqttd_sysmon_sup.erl

@@ -28,15 +28,8 @@ start_link() ->
     supervisor:start_link({local, ?MODULE}, ?MODULE, []).
 
 init([]) ->
-    Sysmon = {sysmon, {emqttd_sysmon, start_link, [opts()]},
+	{ok, Env} = emqttd:env(sysmon),
+    Sysmon = {sysmon, {emqttd_sysmon, start_link, [Env]},
                 permanent, 5000, worker, [emqttd_sysmon]},
     {ok, {{one_for_one, 10, 100}, [Sysmon]}}.
 
-opts() ->
-    Opts = [{long_gc,        emqttd:conf(sysmon_long_gc)},
-            {long_schedule,  emqttd:conf(sysmon_long_schedule)},
-            {large_heap,     emqttd:conf(sysmon_large_heap)},
-            {busy_port,      emqttd:conf(busy_port)},
-            {busy_dist_port, emqttd:conf(sysmon_busy_dist_port)}],
-    [{Key, Val} || {Key, {ok, Val}} <- Opts].
-

+ 1 - 1
src/emqttd_ws.erl

@@ -31,7 +31,7 @@
 %% @doc Handle WebSocket Request.
 handle_request(Req) ->
     Peer = Req:get(peer),
-    PktOpts = emqttd_conf:mqtt(),
+    {ok, PktOpts} = emqttd:env(protocol),
     ParserFun = emqttd_parser:new(PktOpts),
     {ReentryWs, ReplyChannel} = upgrade(Req),
     {ok, ClientPid} = emqttd_ws_client_sup:start_client(self(), Req, ReplyChannel),

+ 3 - 3
src/emqttd_ws_client_sup.erl

@@ -27,7 +27,7 @@
 %% @doc Start websocket client supervisor
 -spec(start_link() -> {ok, pid()}).
 start_link() ->
-    supervisor:start_link({local, ?MODULE}, ?MODULE, [emqttd_conf:mqtt()]).
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
 
 %% @doc Start a WebSocket Client
 -spec(start_client(pid(), mochiweb_request:request(), fun()) -> {ok, pid()}).
@@ -37,8 +37,8 @@ start_client(WsPid, Req, ReplyChannel) ->
 %%--------------------------------------------------------------------
 %% Supervisor callbacks
 %%--------------------------------------------------------------------
-
-init([Env]) ->
+init([]) ->
+    {ok, Env} = emqttd:env(protocol),
     {ok, {{simple_one_for_one, 0, 1},
            [{ws_client, {emqttd_ws_client, start_link, [Env]},
              temporary, 5000, worker, [emqttd_ws_client]}]}}.

+ 32 - 10
test/emqttd_SUITE.erl

@@ -101,8 +101,8 @@ groups() ->
 init_per_suite(Config) ->
     application:start(lager),
     DataDir = proplists:get_value(data_dir, Config),
-    application:set_env(emqttd, conf, filename:join([DataDir, "emqttd.conf"])),
-    application:ensure_all_started(emqttd),
+    peg_com(DataDir),
+    start_apps(emqttd, DataDir),
     Config.
 
 end_per_suite(_Config) ->
@@ -402,14 +402,9 @@ auth_header_(User, Pass) ->
     {"Authorization","Basic " ++ Encoded}.
 
 websocket_test(_) ->
-%    Conn = esockd_connection:new(esockd_transport, nil, []),
-%    Req = mochiweb_request:new(Conn, 'GET', "/mqtt", {1, 1}, 
-%                                mochiweb_headers:make([{"Sec-WebSocket-Protocol","mqtt"},
-%                                                       {"Upgrade","websocket"}
-%                                                      ])),
-    Req = "GET " ++ "/mqtt" ++" HTTP/1.1\r\nUpgrade: WebSocket\r\nConnection: Upgrade\r\n" ++ 
-	"Host: " ++ "127.0.0.1"++ "\r\n" ++
-	"Origin: http://" ++ "127.0.0.1" ++ "/\r\n\r\n",
+    Conn = esockd_connection:new(esockd_transport, nil, []),
+    Req = mochiweb_request:new(Conn, 'GET', "/mqtt", {1, 1},
+                                mochiweb_headers:make([{"Sec-WebSocket-Key","Xn3fdKyc3qEXPuj2A3O+ZA=="}])),
 
     ct:log("Req:~p", [Req]),
     emqttd_http:handle_request(Req).
@@ -598,4 +593,31 @@ slave(node, Node) ->
     {ok, N} = slave:start(host(), Node, "-pa ../../ebin -pa ../../deps/*/ebin"),
     N.
 
+start_apps(App, DataDir) ->
+    Schema = cuttlefish_schema:files([filename:join([DataDir, atom_to_list(App) ++ ".schema"])]),
+    Conf = conf_parse:file(filename:join([DataDir, atom_to_list(App) ++ ".conf"])),
+    NewConfig = cuttlefish_generator:map(Schema, Conf),
+    Vals = proplists:get_value(App, NewConfig),
+    [application:set_env(App, Par, Value) || {Par, Value} <- Vals],
+    application:ensure_all_started(App).
+
+peg_com(DataDir) ->
+    ParsePeg = file2(3, DataDir, "conf_parse.peg"),
+    neotoma:file(ParsePeg),
+    ParseErl = file2(3, DataDir, "conf_parse.erl"),
+    compile:file(ParseErl, []),
+
+    DurationPeg = file2(3, DataDir, "cuttlefish_duration_parse.peg"),
+    neotoma:file(DurationPeg),
+    DurationErl = file2(3, DataDir, "cuttlefish_duration_parse.erl"),
+    compile:file(DurationErl, []).
+    
+
+file2(Times, Dir, FileName) when Times < 1 ->
+    filename:join([Dir, "deps", "cuttlefish","src", FileName]);
+
+file2(Times, Dir, FileName) ->
+    Dir1 = filename:dirname(Dir),
+    file2(Times - 1, Dir1, FileName).
+
 

+ 206 - 196
test/emqttd_SUITE_data/emqttd.conf

@@ -1,270 +1,280 @@
-%%===================================================================
-%%
-%% Config file for emqttd 2.0
-%%
-%% Erlang Term Syntax:
-%%
-%% {}: Tuple, usually {Key, Value}
-%% []: List, seperated by comma
-%% %%: Comment
-%%
-%%===================================================================
+##--------------------------------------------------------------------
+## Node Args
+##--------------------------------------------------------------------
 
-%%--------------------------------------------------------------------
-%% MQTT Protocol
-%%--------------------------------------------------------------------
+## Node name
+node.name = emqttd@127.0.0.1
 
-%% Max ClientId Length Allowed.
-{mqtt_max_clientid_len, 512}.
+## Cookie for distributed node
+node.cookie = emq_dist_cookie
 
-%% Max Packet Size Allowed, 64K by default.
-{mqtt_max_packet_size, 65536}.
+## SMP support: enable, auto, disable
+node.smp = auto
 
-%% Client Idle Timeout.
-{mqtt_client_idle_timeout, 30}. % Second
+## Enable kernel poll
+node.kernel_poll = on
 
-%%--------------------------------------------------------------------
-%% Authentication
-%%--------------------------------------------------------------------
+## async thread pool
+node.async_threads = 32
 
-%% Anonymous: Allow all
-{auth, anonymous, []}.
+## Erlang Process Limit
+node.process_limit = 256000
 
-%% Authentication with username, password
-{auth, username, []}.
+## Sets the maximum number of simultaneously existing ports for this system
+node.max_ports = 65536
 
-%% Authentication with clientId
-{auth, clientid, [{password, no}]}.
+## Set the distribution buffer busy limit (dist_buf_busy_limit)
+node.dist_buffer_size = 32MB
 
-%%--------------------------------------------------------------------
-%% ACL
-%%--------------------------------------------------------------------
+## Max ETS Tables.
+## Note that mnesia and SSL will create temporary ets tables.
+node.max_ets_tables = 256000
 
-{acl, anonymous, []}.
+## Tweak GC to run more often
+node.fullsweep_after = 1000
 
-{acl, internal, [{nomatch, allow}]}.
+## Crash dump
+node.crash_dump = log/crash.dump
 
-%% Cache ACL result for PUBLISH
-{cache_acl, true}.
+## Distributed node ticktime
+node.dist_net_ticktime = 60
 
-%%--------------------------------------------------------------------
-%% Broker
-%%--------------------------------------------------------------------
+## Distributed node port range
+## node.dist_listen_min = 6000
+## node.dist_listen_max = 6999
 
-%% System interval of publishing broker $SYS messages
-{broker_sys_interval, 60}.
+##--------------------------------------------------------------------
+## Log
+##--------------------------------------------------------------------
 
-%%--------------------------------------------------------------------
-%% Session
-%%--------------------------------------------------------------------
+## Console log. Enum: off, file, console, both
+log.console = console
 
-%% Max number of QoS 1 and 2 messages that can be “inflight” at one time.
-%% 0 means no limit
-{session_max_inflight, 100}.
+## Console log level. Enum: debug, info, notice, warning, error, critical, alert, emergency
+log.console.level = error
 
-%% Retry interval for redelivering QoS1/2 messages.
-{session_unack_retry_interval, 60}.
+## Console log file
+## log.console.file = log/console.log
 
-%% Awaiting PUBREL Timeout
-{session_await_rel_timeout, 20}.
+## Error log file
+log.error.file = log/error.log
 
-%% Max Packets that Awaiting PUBREL, 0 means no limit
-{session_max_awaiting_rel, 0}.
+## Enable the crash log. Enum: on, off
+log.crash = on
 
-%% Statistics Collection Interval(seconds)
-{session_collect_interval, 0}.
+log.crash.file = log/crash.log
 
-%% Expired after 2 day (unit: minute)
-{session_expired_after, 2880}.
+##--------------------------------------------------------------------
+## MQTT Protocol
+##--------------------------------------------------------------------
 
-%%--------------------------------------------------------------------
-%% Queue
-%%--------------------------------------------------------------------
+## Max ClientId Length Allowed.
+mqtt.max_clientid_len = 1024
 
-%% Type: simple | priority
-{queue_type, simple}.
+## Max Packet Size Allowed, 64K by default.
+mqtt.max_packet_size = 64KB
 
-%% Topic Priority: 0~255, Default is 0
-%% {queue_priority, [{"topic/1", 10}, {"topic/2", 8}]}.
+## Client Idle Timeout (Second)
+mqtt.client_idle_timeout = 30
 
-%% Max queue length. Enqueued messages when persistent client disconnected,
-%% or inflight window is full.
-{queue_max_length, infinity}.
+## Allow Anonymous authentication
+mqtt.allow_anonymous = true
 
-%% Low-water mark of queued messages
-{queue_low_watermark, 0.2}.
+##--------------------------------------------------------------------
+## MQTT Session
+##--------------------------------------------------------------------
 
-%% High-water mark of queued messages
-{queue_high_watermark, 0.6}.
+## Max number of QoS 1 and 2 messages that can be “inflight” at one time.
+## 0 means no limit
+mqtt.session.max_inflight = 100
 
-%% Queue Qos0 messages?
-{queue_qos0, true}.
+## Retry interval for redelivering QoS1/2 messages.
+mqtt.session.retry_interval = 60
 
-%%--------------------------------------------------------------------
-%% Zone
-%%--------------------------------------------------------------------
+## Awaiting PUBREL Timeout
+mqtt.session.await_rel_timeout = 20
 
-{zone, admin, []}.
+## Max Packets that Awaiting PUBREL, 0 means no limit
+mqtt.session.max_awaiting_rel = 0
 
-%%--------------------------------------------------------------------
-%% Listener
-%%--------------------------------------------------------------------
+## Statistics Collection Interval(seconds)
+mqtt.session.collect_interval = 0
 
-%% Plain MQTT
-{listener, mqtt, 1883, [
-    %% Size of acceptor pool
-    {acceptors, 16},
+## Expired after 1 day:
+## w - week
+## d - day
+## h - hour
+## m - minute
+## s - second
+mqtt.session.expired_after = 1d
 
-    %% Maximum number of concurrent clients
-    {max_clients, 512},
+##--------------------------------------------------------------------
+## MQTT Queue
+##--------------------------------------------------------------------
 
-    %% Mount point prefix
-    %% {mount_point, "prefix/"},
+## Type: simple | priority
+mqtt.queue.type = simple
 
-    %% Socket Access Control
-    {access, [{allow, all}]},
+## Topic Priority: 0~255, Default is 0
+## mqtt.queue.priority = topic/1=10,topic/2=8
 
-    %% Connection Options
-    {connopts, [
-        %% Rate Limit. Format is 'burst, rate', Unit is KB/Sec
-        %% {rate_limit, "100,10"} %% 100K burst, 10K rate
-    ]},
+## Max queue length. Enqueued messages when persistent client disconnected,
+## or inflight window is full.
+mqtt.queue.max_length = infinity
 
-    %% Socket Options
-    {sockopts, [
-        %Set buffer if hight thoughtput
-        %{recbuf, 4096},
-        %{sndbuf, 4096},
-        %{buffer, 4096},
-        %{nodelay, true},
-        {backlog, 1024}
-    ]}
-]}.
+## Low-water mark of queued messages
+mqtt.queue.low_watermark = 20%
 
-%% MQTT/SSL
-{listener, mqtts, 8883, [
-    %% Size of acceptor pool
-    {acceptors, 4},
+## High-water mark of queued messages
+mqtt.queue.high_watermark = 60%
 
-    %% Maximum number of concurrent clients
-    {max_clients, 512},
+## Queue Qos0 messages?
+mqtt.queue.qos0 = true
 
-    %% Socket Access Control
-    {access, [{allow, all}]},
+##--------------------------------------------------------------------
+## MQTT Broker and PubSub
+##--------------------------------------------------------------------
 
-    %% SSL certificate and key files
-    {ssl, [{certfile, "etc/ssl/ssl.crt"},
-           {keyfile,  "etc/ssl/ssl.key"}]},
+## System Interval of publishing broker $SYS Messages
+mqtt.broker.sys_interval = 60
 
-    %% Socket Options
-    {sockopts, [
-        {backlog, 1024}
-        %{buffer, 4096},
-    ]}
-]}.
+## PubSub Pool Size. Default should be scheduler numbers.
+mqtt.pubsub.pool_size = 8
 
-%% HTTP and WebSocket Listener
-{listener, http, 8083, [
-    %% Size of acceptor pool
-    {acceptors, 4},
+mqtt.pubsub.by_clientid = true
 
-    %% Maximum number of concurrent clients
-    {max_clients, 64},
+## Subscribe Asynchronously
+mqtt.pubsub.async = true
 
-    %% Socket Access Control
-    {access, [{allow, all}]},
+##--------------------------------------------------------------------
+## MQTT Bridge
+##--------------------------------------------------------------------
 
-    %% Socket Options
-    {sockopts, [
-        {backlog, 1024}
-        %{buffer, 4096},
-    ]}
-]}.
+## Bridge Queue Size
+mqtt.bridge.max_queue_len = 10000
 
-%%--------------------------------------------------------------------
-%% PubSub
-%%--------------------------------------------------------------------
+## Ping Interval of bridge node. Unit: Second
+mqtt.bridge.ping_down_interval = 1
 
-%% PubSub and Router. Default should be scheduler numbers.
-{pubsub_pool_size, 8}.
+##-------------------------------------------------------------------
+## MQTT Plugins
+##-------------------------------------------------------------------
 
-%%--------------------------------------------------------------------
-%% Routing
-%%--------------------------------------------------------------------
+## Dir of plugins' config
+##mqtt.plugins.etc_dir = etc/plugins/
 
-%% Route aging time(seconds)
-{routing_age, 5}.
+## File to store loaded plugin names.
+##mqtt.plugins.loaded_file = data/loaded_plugins
 
-%%--------------------------------------------------------------------
-%% Bridge
-%%--------------------------------------------------------------------
+##-------------------------------------------------------------------
+## MQTT Modules
+##-------------------------------------------------------------------
 
-%% TODO: Bridge Queue Size
-{bridge_max_queue_len, 10000}.
+## Enable retainer module
+mqtt.module.retainer = on
 
-%% Ping Interval of bridge node
-{bridge_ping_down_interval, 1}. % second
+## disc: disc_copies, ram: ram_copies
+mqtt.module.retainer.storage_type = ram
 
-%%-------------------------------------------------------------------
-%% Plugins
-%%-------------------------------------------------------------------
+## Max number of retained messages
+mqtt.module.retainer.max_message_num = 100000
 
-%% Dir of plugins' config
-{plugins_etc_dir, "etc/plugins/"}.
+## Max Payload Size of retained message
+mqtt.module.retainer.max_payload_size = 64KB
 
-%% File to store loaded plugin names.
-{plugins_loaded_file, "data/loaded_plugins"}.
+## Expired after seconds, never expired if 0
+mqtt.module.retainer.expired_after = 0
 
-%%--------------------------------------------------------------------
-%% Modules
-%%--------------------------------------------------------------------
+## Enable presence module
+## Client presence management module. Publish presence messages when
+## client connected or disconnected.
+mqtt.module.presence = on
 
-%% Retainer Module
-{module, retainer, [
+mqtt.module.presence.qos = 0
 
-    %% disc: disc_copies, ram: ram_copies
-    {storage, ram},
+## Enable subscription module
+## Subscribe topics automatically when client connected
+mqtt.module.subscription = on
 
-    %% Max number of retained messages
-    {max_message_num, 100000},
+mqtt.module.subscription.topics = $client/%c=1,$user/%u=1
 
-    %% Max Payload Size of retained message
-    {max_playload_size, 65536},
+##--------------------------------------------------------------------
+## MQTT Listeners
+##--------------------------------------------------------------------
 
-    %% Expired after seconds, never expired if 0
-    {expired_after, 0}
+## TCP Listener: 1883, 127.0.0.1:1883, ::1:1883
+mqtt.listener.tcp = 1883
 
-]}.
+## Size of acceptor pool
+mqtt.listener.tcp.acceptors = 8
 
-%% Client presence management module. Publish presence messages when 
-%% client connected or disconnected.
-{module, presence, [{qos, 0}]}.
+## Maximum number of concurrent clients
+mqtt.listener.tcp.max_clients = 1024
 
-%% Subscribe topics automatically when client connected
-{module, subscription, [{"$queue/clients/$c", 1}, backend]}.
+## Rate Limit. Format is 'burst,rate', Unit is KB/Sec
+## mqtt.listener.tcp.rate_limit = 100,10
 
-%% [Rewrite](https://github.com/emqtt/emqttd/wiki/Rewrite)
-{module, rewrite, []}.
+## TCP Socket Options
+mqtt.listener.tcp.backlog = 1024
+## mqtt.listener.tcp.recbuf = 4096
+## mqtt.listener.tcp.sndbuf = 4096
+## mqtt.listener.tcp.buffer = 4096
+## mqtt.listener.tcp.nodelay = true
 
-%%-------------------------------------------------------------------
-%% Erlang System Monitor
-%%-------------------------------------------------------------------
+## SSL Listener: 8883, 127.0.0.1:8883, ::1:8883
+mqtt.listener.ssl = 8883
 
-%% Long GC, don't monitor in production mode for:
-%% https://github.com/erlang/otp/blob/feb45017da36be78d4c5784d758ede619fa7bfd3/erts/emulator/beam/erl_gc.c#L421
+## Size of acceptor pool
+mqtt.listener.ssl.acceptors = 4
 
-{sysmon_long_gc, false}.
+## Maximum number of concurrent clients
+mqtt.listener.ssl.max_clients = 512
 
-%% Long Schedule(ms)
-{sysmon_long_schedule, 240}.
+## Rate Limit. Format is 'burst,rate', Unit is KB/Sec
+## mqtt.listener.ssl.rate_limit = 100,10
 
-%% 8M words. 32MB on 32-bit VM, 64MB on 64-bit VM.
-%% 8 * 1024 * 1024
-{sysmon_large_heap, 8388608}.
+## Configuring SSL Options
+## See http://erlang.org/doc/man/ssl.html
+mqtt.listener.ssl.handshake_timeout = 15 #seconds
+mqtt.listener.ssl.keyfile = etc/ssl/key.pem
+mqtt.listener.ssl.certfile = etc/ssl/cert.pem
+mqtt.listener.ssl.cacertfile = etc/ssl/cacert.pem
+## mqtt.listener.ssl.verify = verify_peer
+## mqtt.listener.ssl.failed_if_no_peer_cert = true
 
-%% Busy Port
-{sysmon_busy_port, false}.
+## HTTP Listener
+mqtt.listener.http = 8083
+mqtt.listener.http.acceptors = 4
+mqtt.listener.http.max_clients = 64
 
-%% Busy Dist Port
-{sysmon_busy_dist_port, true}.
+## HTTP(SSL) Listener
+mqtt.listener.https = 8084
+mqtt.listener.https.acceptors = 4
+mqtt.listener.https.max_clients = 64
+mqtt.listener.https.handshake_timeout = 10 #seconds
+mqtt.listener.https.certfile = etc/ssl/cert.pem
+mqtt.listener.https.keyfile = etc/ssl/key.pem
+mqtt.listener.https.cacertfile = etc/ssl/cacert.pem
+## mqtt.listener.https.verify = verify_peer
+## mqtt.listener.https.failed_if_no_peer_cert = true
+
+##-------------------------------------------------------------------
+## System Monitor
+##-------------------------------------------------------------------
+
+## Long GC, don't monitor in production mode for:
+## https://github.com/erlang/otp/blob/feb45017da36be78d4c5784d758ede619fa7bfd3/erts/emulator/beam/erl_gc.c#L421
+sysmon.long_gc = false
+
+## Long Schedule(ms)
+sysmon.long_schedule = 240
+
+## 8M words. 32MB on 32-bit VM, 64MB on 64-bit VM.
+sysmon.large_heap = 8MB
+
+## Busy Port
+sysmon.busy_port = false
+
+## Busy Dist Port
+sysmon.busy_dist_port = true
 

+ 752 - 0
test/emqttd_SUITE_data/emqttd.schema

@@ -0,0 +1,752 @@
+%%-*- mode: erlang -*-
+%% EMQ config mapping
+
+%%--------------------------------------------------------------------
+%% Erlang Node
+%%--------------------------------------------------------------------
+
+%% @doc Erlang node name
+{mapping, "node.name", "vm_args.-name", [
+  {default, "emqttd@127.0.0.1"}
+]}.
+
+%% @doc Secret cookie for distributed erlang node
+{mapping, "node.cookie", "vm_args.-setcookie", [
+  {default, "emqsecretcookie"}
+]}.
+
+%% @doc SMP Support
+{mapping, "node.smp", "vm_args.-smp", [
+  {default, auto},
+  {datatype, {enum, [enable, auto, disable]}},
+  hidden
+]}.
+
+%% @doc Enable Kernel Poll
+{mapping, "node.kernel_poll", "vm_args.+K", [
+  {default, on},
+  {datatype, flag},
+  hidden
+]}.
+
+%% @doc More information at: http://erlang.org/doc/man/erl.html
+{mapping, "node.async_threads", "vm_args.+A", [
+  {default, 64},
+  {datatype, integer},
+  {validators, ["range:0-1024"]}
+]}.
+
+%% @doc Erlang Process Limit
+{mapping, "node.process_limit", "vm_args.+P", [
+  {datatype, integer},
+  {default, 256000},
+  hidden
+]}.
+
+%% Note: OTP R15 and earlier uses -env ERL_MAX_PORTS, R16+ uses +Q
+%% @doc The number of concurrent ports/sockets
+%% Valid range is 1024-134217727
+{mapping, "node.max_ports",
+  cuttlefish:otp("R16", "vm_args.+Q", "vm_args.-env ERL_MAX_PORTS"), [
+  {default, 262144},
+  {datatype, integer},
+  {validators, ["range4ports"]}
+]}.
+
+{validator, "range4ports", "must be 1024 to 134217727",
+ fun(X) -> X >= 1024 andalso X =< 134217727 end}.
+
+%% @doc http://www.erlang.org/doc/man/erl.html#%2bzdbbl
+{mapping, "node.dist_buffer_size", "vm_args.+zdbbl", [
+  {datatype, bytesize},
+  {commented, "32MB"},
+  hidden,
+  {validators, ["zdbbl_range"]}
+]}.
+
+{translation, "vm_args.+zdbbl",
+ fun(Conf) ->
+  ZDBBL = cuttlefish:conf_get("node.dist_buffer_size", Conf, undefined),
+  case ZDBBL of
+    undefined -> undefined;
+    X when is_integer(X) -> cuttlefish_util:ceiling(X / 1024); %% Bytes to Kilobytes;
+    _ -> undefined
+  end
+ end
+}.
+
+{validator, "zdbbl_range", "must be between 1KB and 2097151KB",
+ fun(ZDBBL) ->
+  %% 2097151KB = 2147482624
+  ZDBBL >= 1024 andalso ZDBBL =< 2147482624
+ end
+}.
+
+%% @doc http://www.erlang.org/doc/man/erlang.html#system_flag-2
+{mapping, "node.fullsweep_after", "vm_args.-env ERL_FULLSWEEP_AFTER", [
+  {default, 1000},
+  {datatype, integer},
+  hidden,
+  {validators, ["positive_integer"]}
+]}.
+
+{validator, "positive_integer", "must be a positive integer",
+  fun(X) -> X >= 0 end}.
+
+%% Note: OTP R15 and earlier uses -env ERL_MAX_ETS_TABLES,
+%% R16+ uses +e
+%% @doc The ETS table limit
+{mapping, "node.max_ets_tables",
+  cuttlefish:otp("R16", "vm_args.+e", "vm_args.-env ERL_MAX_ETS_TABLES"), [
+  {default, 256000},
+  {datatype, integer},
+  hidden
+]}.
+
+%% @doc Set the location of crash dumps
+{mapping, "node.crash_dump", "vm_args.-env ERL_CRASH_DUMP", [
+  {default, "{{crash_dump}}"},
+  {datatype, file},
+  hidden
+]}.
+
+%% @doc http://www.erlang.org/doc/man/kernel_app.html#net_ticktime
+{mapping, "node.dist_net_ticktime", "vm_args.-kernel net_ticktime", [
+  {commented, 60},
+  {datatype, integer},
+  hidden
+]}.
+
+%% @doc http://www.erlang.org/doc/man/kernel_app.html
+{mapping, "node.dist_listen_min", "kernel.inet_dist_listen_min", [
+  {commented, 6000},
+  {datatype, integer},
+  hidden
+]}.
+
+%% @see node.dist_listen_min
+{mapping, "node.dist_listen_max", "kernel.inet_dist_listen_max", [
+  {commented, 6999},
+  {datatype, integer},
+  hidden
+]}.
+
+%%--------------------------------------------------------------------
+%% Log
+%%--------------------------------------------------------------------
+
+{mapping, "log.console", "lager.handlers", [
+  {default, file },
+  {datatype, {enum, [off, file, console, both]}}
+]}.
+
+{mapping, "log.console.level", "lager.handlers", [
+  {default, info},
+  {datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency, none]}}
+]}.
+
+{mapping, "log.console.file", "lager.handlers", [
+  {default, "log/console.log"},
+  {datatype, file}
+]}.
+
+{mapping, "log.error.file", "lager.handlers", [
+  {default, "log/error.log"},
+  {datatype, file}
+]}.
+
+{mapping, "log.error.redirect", "lager.error_logger_redirect", [
+  {default, on},
+  {datatype, flag},
+  hidden
+]}.
+
+{mapping, "log.error.messages_per_second", "lager.error_logger_hwm", [
+  {default, 1000},
+  {datatype, integer},
+  hidden
+]}.
+
+{translation,
+ "lager.handlers",
+ fun(Conf) ->
+    ErrorHandler = case cuttlefish:conf_get("log.error.file", Conf) of
+      undefined -> [];
+      ErrorFilename -> [{lager_file_backend, [{file, ErrorFilename},
+                                              {level, error},
+                                              {size, 10485760},
+                                              {date, "$D0"},
+                                              {count, 5}]}]
+    end,
+
+    ConsoleLogLevel = cuttlefish:conf_get("log.console.level", Conf),
+    ConsoleLogFile = cuttlefish:conf_get("log.console.file", Conf),
+
+    ConsoleHandler = {lager_console_backend, ConsoleLogLevel},
+    ConsoleFileHandler = {lager_file_backend, [{file, ConsoleLogFile},
+                                               {level, ConsoleLogLevel},
+                                               {size, 10485760},
+                                               {date, "$D0"},
+                                               {count, 5}]},
+
+    ConsoleHandlers = case cuttlefish:conf_get("log.console", Conf) of
+      off -> [];
+      file -> [ConsoleFileHandler];
+      console -> [ConsoleHandler];
+      both -> [ConsoleHandler, ConsoleFileHandler];
+      _ -> []
+    end,
+    ConsoleHandlers ++ ErrorHandler
+  end
+}.
+
+{mapping, "log.crash", "lager.crash_log", [
+  {default, on},
+  {datatype, flag}
+]}.
+
+{mapping, "log.crash.file", "lager.crash_log", [
+  {default, "log/crash.log"},
+  {datatype, file}
+]}.
+
+{translation,
+ "lager.crash_log",
+ fun(Conf) ->
+     case cuttlefish:conf_get("log.crash", Conf) of
+         false -> undefined;
+         _ ->
+             cuttlefish:conf_get("log.crash.file", Conf, "./log/crash.log")
+     end
+ end}.
+
+{mapping, "sasl", "sasl.sasl_error_logger", [
+  {default, off},
+  {datatype, flag},
+  hidden
+]}.
+
+%%--------------------------------------------------------------------
+%% MQTT Protocol
+%%--------------------------------------------------------------------
+
+%% @doc Set the Max ClientId Length Allowed.
+{mapping, "mqtt.max_clientid_len", "emqttd.protocol", [
+  {default, 1024},
+  {datatype, integer}
+]}.
+
+%% @doc Max Packet Size Allowed, 64K by default.
+{mapping, "mqtt.max_packet_size", "emqttd.protocol", [
+  {default, "64KB"},
+  {datatype, bytesize}
+]}.
+
+%% @doc Client Idle Timeout.
+{mapping, "mqtt.client_idle_timeout", "emqttd.protocol", [
+  {default, 30},
+  {datatype, integer}
+]}.
+
+{translation, "emqttd.protocol", fun(Conf) ->
+  [{max_clientid_len,    cuttlefish:conf_get("mqtt.max_clientid_len", Conf)},
+   {max_packet_size,     cuttlefish:conf_get("mqtt.max_packet_size", Conf)},
+   {client_idle_timeout, cuttlefish:conf_get("mqtt.client_idle_timeout", Conf)}]
+end}.
+
+%% @doc Allow Anonymous
+{mapping, "mqtt.allow_anonymous", "emqttd.allow_anonymous", [
+  {default, false},
+  {datatype, {enum, [true, false]}},
+  hidden
+]}.
+
+%%--------------------------------------------------------------------
+%% MQTT Session
+%%--------------------------------------------------------------------
+
+%% @doc Max number of QoS 1 and 2 messages that can be “inflight” at one time.
+%% 0 means no limit
+{mapping, "mqtt.session.max_inflight", "emqttd.session", [
+  {default, 100},
+  {datatype, integer}
+]}.
+
+
+%% @doc Retry interval for redelivering QoS1/2 messages.
+{mapping, "mqtt.session.retry_interval", "emqttd.session", [
+  {default, 60},
+  {datatype, integer}
+]}.
+
+%% @doc Awaiting PUBREL Timeout
+{mapping, "mqtt.session.await_rel_timeout", "emqttd.session", [
+  {default, 30},
+  {datatype, integer}
+]}.
+
+%% @doc Max Packets that Awaiting PUBREL, 0 means no limit
+{mapping, "mqtt.session.max_awaiting_rel", "emqttd.session", [
+  {default, 0},
+  {datatype, integer}
+]}.
+
+%% @doc Statistics Collection Interval(seconds)
+{mapping, "mqtt.session.collect_interval", "emqttd.session", [
+  {default, 0},
+  {datatype, integer}
+]}.
+
+%% @doc Session expired after...
+{mapping, "mqtt.session.expired_after", "emqttd.session", [
+  {default, "2d"},
+  {datatype, {duration, s}}
+]}.
+
+{translation, "emqttd.session", fun(Conf) ->
+  [{max_inflight, cuttlefish:conf_get("mqtt.session.max_inflight", Conf)},
+   {retry_interval, cuttlefish:conf_get("mqtt.session.retry_interval", Conf)},
+   {await_rel_timeout, cuttlefish:conf_get("mqtt.session.await_rel_timeout", Conf)},
+   {max_awaiting_rel, cuttlefish:conf_get("mqtt.session.max_awaiting_rel", Conf)},
+   {collect_interval, cuttlefish:conf_get("mqtt.session.collect_interval", Conf)},
+   {expired_after, cuttlefish:conf_get("mqtt.session.expired_after", Conf)}]
+end}.
+
+%%--------------------------------------------------------------------
+%% MQTT Queue
+%%--------------------------------------------------------------------
+
+%% @doc Type: simple | priority
+{mapping, "mqtt.queue.type", "emqttd.queue", [
+  {default, simple},
+  {datatype, atom}
+]}.
+
+%% @doc Topic Priority: 0~255, Default is 0
+{mapping, "mqtt.queue.priority", "emqttd.queue", [
+  {default, ""},
+  {datatype, string},
+  hidden
+]}.
+
+%% @doc Max queue length. Enqueued messages when persistent client disconnected, or inflight window is full.
+{mapping, "mqtt.queue.max_length", "emqttd.queue", [
+  {default, infinity},
+  {datatype, [atom, integer]}
+]}.
+
+%% @doc Low-water mark of queued messages
+{mapping, "mqtt.queue.low_watermark", "emqttd.queue", [
+  {default, "20%"},
+  {datatype, string},
+  hidden
+]}.
+
+%% @doc High-water mark of queued messages
+{mapping, "mqtt.queue.high_watermark", "emqttd.queue", [
+  {default, "60%"},
+  {datatype, string},
+  hidden
+]}.
+
+%% @doc Queue Qos0 messages?
+{mapping, "mqtt.queue.qos0", "emqttd.queue", [
+  {default, true},
+  {datatype, {enum, [true, false]}}
+]}.
+
+{translation, "emqttd.queue", fun(Conf) ->
+  Parse = fun(S) ->
+			{match, [N]} = re:run(S, "^([0-9]+)%$", [{capture, all_but_first, list}]),
+			list_to_integer(N) / 100
+	      end,
+  Opts = [{type, cuttlefish:conf_get("mqtt.queue.type", Conf, simple)},
+          {max_length, cuttlefish:conf_get("mqtt.queue.max_length", Conf)},
+          {low_watermark, Parse(cuttlefish:conf_get("mqtt.queue.low_watermark", Conf))},
+          {high_watermark, Parse(cuttlefish:conf_get("mqtt.queue.high_watermark", Conf))},
+          {queue_qos0, cuttlefish:conf_get("mqtt.queue.qos0", Conf)}],
+  case cuttlefish:conf_get("mqtt.queue.priority", Conf) of
+    undefined -> Opts;
+    V -> [{priority,
+			 [begin [T, P] = string:tokens(S, "="),
+					{T, list_to_integer(P)}
+		      end || S <- string:tokens(V, ",")]}|Opts]
+  end
+end}.
+
+%%--------------------------------------------------------------------
+%% MQTT Broker
+%%--------------------------------------------------------------------
+
+{mapping, "mqtt.broker.sys_interval", "emqttd.broker_sys_interval", [
+  {default, 60},
+  {datatype, integer}
+]}.
+
+%%--------------------------------------------------------------------
+%% MQTT PubSub
+%%--------------------------------------------------------------------
+
+{mapping, "mqtt.pubsub.pool_size", "emqttd.pubsub", [
+  {default, 8},
+  {datatype, integer}
+]}.
+
+{mapping, "mqtt.pubsub.by_clientid", "emqttd.pubsub", [
+  {default, true},
+  {datatype, {enum, [true, false]}}
+]}.
+
+{mapping, "mqtt.pubsub.async", "emqttd.pubsub", [
+  {default, true},
+  {datatype, {enum, [true, false]}},
+  hidden
+]}.
+
+{translation, "emqttd.pubsub", fun(Conf) ->
+  [{pool_size, cuttlefish:conf_get("mqtt.pubsub.pool_size", Conf)},
+   {by_clientid, cuttlefish:conf_get("mqtt.pubsub.by_clientid", Conf)},
+   {async, cuttlefish:conf_get("mqtt.pubsub.async", Conf)}]
+end}.
+
+%%--------------------------------------------------------------------
+%% MQTT Bridge
+%%--------------------------------------------------------------------
+
+{mapping, "mqtt.bridge.max_queue_len", "emqttd.bridge", [
+  {default, 10000},
+  {datatype, integer}
+]}.
+
+{mapping, "mqtt.bridge.ping_down_interval", "emqttd.bridge", [
+  {default, 1},
+  {datatype, integer}
+]}.
+
+{translation, "emqttd.bridge", fun(Conf) ->
+  [{max_queue_len, cuttlefish:conf_get("mqtt.bridge.max_queue_len", Conf)},
+   {ping_down_interval, cuttlefish:conf_get("mqtt.bridge.ping_down_interval", Conf)}]
+end}.
+
+%%-------------------------------------------------------------------
+%% MQTT Plugins
+%%-------------------------------------------------------------------
+
+{mapping, "mqtt.plugins.etc_dir", "emqttd.plugins_etc_dir", [
+  {datatype, string}
+]}.
+
+{mapping, "mqtt.plugins.loaded_file", "emqttd.plugins_loaded_file", [
+  {datatype, string}
+]}.
+
+%%--------------------------------------------------------------------
+%% MQTT Listeners
+%%--------------------------------------------------------------------
+
+{mapping, "mqtt.listener.tcp", "emqttd.listeners", [
+  {default, 1883},
+  {datatype, [integer, ip]}
+]}.
+
+{mapping, "mqtt.listener.tcp.acceptors", "emqttd.listeners", [
+  {default, 8},
+  {datatype, integer}
+]}.
+
+{mapping, "mqtt.listener.tcp.max_clients", "emqttd.listeners", [
+  {default, 1024},
+  {datatype, integer}
+]}.
+
+{mapping, "mqtt.listener.tcp.rate_limit", "emqttd.listeners", [
+  {default, undefined},
+  {datatype, string},
+  hidden
+]}.
+
+{mapping, "mqtt.listener.tcp.backlog", "emqttd.listeners", [
+  {default, 1024},
+  {datatype, integer}
+]}.
+
+{mapping, "mqtt.listener.tcp.recbuf", "emqttd.listeners", [
+  {datatype, integer},
+  hidden
+]}.
+
+{mapping, "mqtt.listener.tcp.sndbuf", "emqttd.listeners", [
+  {datatype, integer},
+  hidden
+]}.
+
+{mapping, "mqtt.listener.tcp.buffer", "emqttd.listeners", [
+  {datatype, integer},
+  hidden
+]}.
+
+{mapping, "mqtt.listener.tcp.nodelay", "emqttd.listeners", [
+  {datatype, {enum, [true, false]}},
+  hidden
+]}.
+
+{mapping, "mqtt.listener.ssl", "emqttd.listeners", [
+  {default, 8883},
+  {datatype, [integer, ip]}
+]}.
+
+{mapping, "mqtt.listener.ssl.acceptors", "emqttd.listeners", [
+  {default, 8},
+  {datatype, integer}
+]}.
+
+{mapping, "mqtt.listener.ssl.max_clients", "emqttd.listeners", [
+  {default, 512},
+  {datatype, integer}
+]}.
+
+{mapping, "mqtt.listener.ssl.rate_limit", "emqttd.listeners", [
+  {datatype, string}
+]}.
+
+{mapping, "mqtt.listener.ssl.handshake_timeout", "emqttd.listeners", [
+  {default, 15},
+  {datatype, integer}
+]}.
+
+{mapping, "mqtt.listener.ssl.keyfile", "emqttd.listeners", [
+  {datatype, string}
+]}.
+
+{mapping, "mqtt.listener.ssl.certfile", "emqttd.listeners", [
+  {datatype, string}
+]}.
+
+{mapping, "mqtt.listener.ssl.cacertfile", "emqttd.listeners", [
+  {datatype, string}
+]}.
+
+{mapping, "mqtt.listener.ssl.verify", "emqttd.listeners", [
+  {datatype, string}
+]}.
+
+{mapping, "mqtt.listener.ssl.failed_if_no_peer_cert", "emqttd.listeners", [
+  {datatype, {enum, [true, false]}}
+]}.
+
+{mapping, "mqtt.listener.http", "emqttd.listeners", [
+  {default, 8883},
+  {datatype, [integer, ip]}
+]}.
+
+{mapping, "mqtt.listener.http.acceptors", "emqttd.listeners", [
+  {default, 8},
+  {datatype, integer}
+]}.
+
+{mapping, "mqtt.listener.http.max_clients", "emqttd.listeners", [
+  {default, 64},
+  {datatype, integer}
+]}.
+
+{mapping, "mqtt.listener.https", "emqttd.listeners", [
+  {default, undefined},
+  {datatype, [integer, ip]},
+  hidden
+]}.
+
+{mapping, "mqtt.listener.https.acceptors", "emqttd.listeners", [
+  {default, 8},
+  {datatype, integer}
+]}.
+
+{mapping, "mqtt.listener.https.max_clients", "emqttd.listeners", [
+  {default, 64},
+  {datatype, integer}
+]}.
+
+{mapping, "mqtt.listener.https.handshake_timeout", "emqttd.listeners", [
+  {default, 15},
+  {datatype, integer}
+]}.
+
+{mapping, "mqtt.listener.https.keyfile", "emqttd.listeners", [
+  {datatype, string}
+]}.
+
+{mapping, "mqtt.listener.https.certfile", "emqttd.listeners", [
+  {datatype, string}
+]}.
+
+{mapping, "mqtt.listener.https.cacertfile", "emqttd.listeners", [
+  {datatype, string}
+]}.
+
+{mapping, "mqtt.listener.https.verify", "emqttd.listeners", [
+  {datatype, string}
+]}.
+
+{mapping, "mqtt.listener.https.failed_if_no_peer_cert", "emqttd.listeners", [
+  {datatype, {enum, [true, false]}}
+]}.
+
+{translation, "emqttd.listeners", fun(Conf) ->
+    Filter  = fun(Opts) -> [{K, V} || {K, V} <- Opts, V =/= undefined] end,
+    LisOpts = fun(Prefix) ->
+                  Filter([{acceptors,   cuttlefish:conf_get(Prefix ++ ".acceptors", Conf)},
+                          {max_clients, cuttlefish:conf_get(Prefix ++ ".max_clients", Conf)},
+                          {rate_limt,   cuttlefish:conf_get(Prefix ++ ".rate_limit", Conf, undefined)}])
+ end,
+    TcpOpts = fun(Prefix) ->
+                   Filter([{backlog, cuttlefish:conf_get(Prefix ++ ".backlog", Conf, undefined)},
+                           {recbuf,  cuttlefish:conf_get(Prefix ++ ".recbuf", Conf, undefined)},
+                           {sndbuf,  cuttlefish:conf_get(Prefix ++ ".sndbuf", Conf, undefined)},
+                           {buffer,  cuttlefish:conf_get(Prefix ++ ".buffer", Conf, undefined)},
+                           {nodelay, cuttlefish:conf_get(Prefix ++ ".nodelay", Conf, true)}])
+              end,
+    SslOpts = fun(Prefix) ->
+                  Filter([{handshake_timeout, cuttlefish:conf_get(Prefix ++ ".handshake_timeout", Conf)},
+                          {keyfile,    cuttlefish:conf_get(Prefix ++ ".keyfile", Conf, undefined)},
+                          {certfile,   cuttlefish:conf_get(Prefix ++ ".certfile", Conf, undefined)},
+                          {cacertfile, cuttlefish:conf_get(Prefix ++ ".cacertfile", Conf, undefined)},
+                          {verify,     cuttlefish:conf_get(Prefix ++ ".verify_peer", Conf, undefined)},
+                          {failed_if_no_peer_cert, cuttlefish:conf_get(Prefix ++ "failed_if_no_peer_cert", Conf, undefined)}])
+              end,
+
+    Listeners = fun(Name) when is_atom(Name) ->
+                    Key = "mqtt.listener." ++ atom_to_list(Name),
+                    case cuttlefish:conf_get(Key, Conf, undefined) of
+                        undefined ->
+                            [];
+                        Port ->
+                            ConnOpts = Filter([{rate_limit, cuttlefish:conf_get(Key ++ ".rate_limit", Conf, undefined)}]),
+                            Opts = [{connopts, ConnOpts}, {sockopts, TcpOpts(Key)} | LisOpts(Key)],
+                            [{Name, Port, case Name =:= ssl orelse Name =:= https of
+                                              true  -> [{ssl, SslOpts(Key)} | Opts];
+                                              false -> Opts
+                                          end}]
+                   end
+                end,
+    lists:append([Listeners(tcp), Listeners(ssl), Listeners(http), Listeners(https)])
+end}.
+
+%%--------------------------------------------------------------------
+%% MQTT Modules
+%%--------------------------------------------------------------------
+
+{mapping, "mqtt.module.retainer", "emqttd.modules", [
+  {default, on},
+  {datatype, flag}
+]}.
+
+{mapping, "mqtt.module.retainer.storage_type", "emqttd.modules", [
+  {default, ram},
+  {datatype, {enum, [disc, ram]}}
+]}.
+
+{mapping, "mqtt.module.retainer.max_message_num", "emqttd.modules", [
+  {default, 100000},
+  {datatype, integer}
+]}.
+
+{mapping, "mqtt.module.retainer.max_payload_size", "emqttd.modules", [
+  {default, "64KB"},
+  {datatype, bytesize}
+]}.
+
+{mapping, "mqtt.module.retainer.expired_after", "emqttd.modules", [
+  {default, 0},
+  {datatype, integer}
+]}.
+
+{mapping, "mqtt.module.presence", "emqttd.modules", [
+  {default, on},
+  {datatype, flag}
+]}.
+
+{mapping, "mqtt.module.presence.qos", "emqttd.modules", [
+  {default, 0},
+  {datatype, integer},
+  {validators, ["range:0-2"]}
+]}.
+
+{mapping, "mqtt.module.subscription", "emqttd.modules", [
+  {default, off},
+  {datatype, flag}
+]}.
+
+{mapping, "mqtt.module.subscription.topics", "emqttd.modules", [
+  {default, undefined},
+  {datatype, string}
+]}.
+
+{translation, "emqttd.modules", fun(Conf) ->
+    WithMod = fun(Name, OptsF) ->
+                  Key = "mqtt.module." ++ atom_to_list(Name),
+                  case cuttlefish:conf_get(Key, Conf, false) of
+                      true  -> [{Name, OptsF(Key)}];
+                      false -> []
+                  end
+              end,
+    RetainOpts = fun(Prefix) ->
+                     [{storage_type, cuttlefish:conf_get(Prefix ++ ".storage_type", Conf, ram)},
+                      {max_message_num, cuttlefish:conf_get(Prefix ++ ".max_message_num", Conf, undefined)},
+                      {max_payload_size, cuttlefish:conf_get(Prefix ++ ".max_payload_size", Conf, undefined)},
+                      {expired_after, cuttlefish:conf_get(Prefix ++ ".expired_after", Conf, 0)}]
+                 end,
+    PresOpts = fun(Prefix) ->
+                   [{qos, cuttlefish:conf_get(Prefix ++ ".qos", Conf, 0)}]
+               end,
+    ParseFun = fun(undefined) -> [];
+                  (Topics)    -> [begin
+                                      [Topic, Qos] = string:tokens(S, "="),
+                                      {list_to_binary(Topic), list_to_integer(Qos)}
+                                  end || S <- string:tokens(Topics, ",")]
+               end,
+    SubOpts = fun(Prefix) -> [{topics, ParseFun(cuttlefish:conf_get(Prefix ++ ".topics", Conf))}] end,
+    lists:append([WithMod(retainer, RetainOpts), WithMod(presence, PresOpts), WithMod(subscription, SubOpts)])
+end}.
+
+%%--------------------------------------------------------------------
+%% System Monitor
+%%--------------------------------------------------------------------
+
+%% @doc Long GC, don't monitor in production mode for:
+%% https://github.com/erlang/otp/blob/feb45017da36be78d4c5784d758ede619fa7bfd3/erts/emulator/beam/erl_gc.c#L421
+{mapping, "sysmon.long_gc", "emqttd.sysmon", [
+  {default, false},
+  {datatype, {enum, [true, false]}}
+]}.
+
+%% @doc Long Schedule(ms)
+{mapping, "sysmon.long_schedule", "emqttd.sysmon", [
+  {default, 1000},
+  {datatype, integer}
+]}.
+
+%% @doc Large Heap
+{mapping, "sysmon.large_heap", "emqttd.sysmon", [
+  {default, "8MB"},
+  {datatype, bytesize}
+]}.
+
+%% @doc Monitor Busy Port
+{mapping, "sysmon.busy_port", "emqttd.sysmon", [
+  {default, false},
+  {datatype, {enum, [true, false]}}
+]}.
+
+%% @doc Monitor Busy Dist Port
+{mapping, "sysmon.busy_dist_port", "emqttd.sysmon", [
+  {default, true},
+  {datatype, {enum, [true, false]}}
+]}.
+
+{translation, "emqttd.sysmon", fun(Conf) ->
+    [{long_gc, cuttlefish:conf_get("sysmon.long_gc", Conf)},
+     {long_schedule, cuttlefish:conf_get("sysmon.long_schedule", Conf)},
+     {large_heap, cuttlefish:conf_get("sysmon.large_heap", Conf)},
+     {busy_port, cuttlefish:conf_get("sysmon.busy_port", Conf)},
+     {busy_dist_port, cuttlefish:conf_get("sysmon.busy_dist_port", Conf)}]
+end}.
+

+ 9 - 14
test/emqttd_access_SUITE.erl

@@ -41,7 +41,6 @@ groups() ->
 init_per_group(access_control, Config) ->
     application:load(emqttd),
     prepare_config(),
-    gen_conf:init(emqttd),
     Config;
 
 init_per_group(_Group, Config) ->
@@ -92,43 +91,39 @@ end_per_testcase(_TestCase, _Config) ->
 %%--------------------------------------------------------------------
 
 reload_acl(_) ->
-    [ok] = ?AC:reload_acl().
+    [] = ?AC:reload_acl().
 
 register_mod(_) ->
     ok = ?AC:register_mod(acl, emqttd_acl_test_mod, []),
     {error, already_existed} = ?AC:register_mod(acl, emqttd_acl_test_mod, []),
-    [{emqttd_acl_test_mod, _, 0},
-     {emqttd_acl_internal, _, 0}] = ?AC:lookup_mods(acl),
+    [{emqttd_acl_test_mod, _, 0}] = ?AC:lookup_mods(acl),
     ok = ?AC:register_mod(auth, emqttd_auth_anonymous_test_mod,[]),
     ok = ?AC:register_mod(auth, emqttd_auth_dashboard, [], 99),
     [{emqttd_auth_dashboard, _, 99},
-     {emqttd_auth_anonymous_test_mod, _, 0},
-     {emqttd_auth_anonymous, _, 0}] = ?AC:lookup_mods(auth).
+     {emqttd_auth_anonymous_test_mod, _, 0}] = ?AC:lookup_mods(auth).
 
 unregister_mod(_) ->
     ok = ?AC:register_mod(acl, emqttd_acl_test_mod, []),
-    [{emqttd_acl_test_mod, _, 0},
-     {emqttd_acl_internal, _, 0}] = ?AC:lookup_mods(acl),
+    [{emqttd_acl_test_mod, _, 0}] = ?AC:lookup_mods(acl),
     ok = ?AC:unregister_mod(acl, emqttd_acl_test_mod),
     timer:sleep(5),
-    [{emqttd_acl_internal, _, 0}] = ?AC:lookup_mods(acl),
+    [] = ?AC:lookup_mods(acl),
     ok = ?AC:register_mod(auth, emqttd_auth_anonymous_test_mod,[]),
-    [{emqttd_auth_anonymous_test_mod, _, 0},
-     {emqttd_auth_anonymous, _, 0}] = ?AC:lookup_mods(auth),
+    [{emqttd_auth_anonymous_test_mod, _, 0}] = ?AC:lookup_mods(auth),
 
     ok = ?AC:unregister_mod(auth, emqttd_auth_anonymous_test_mod),
     timer:sleep(5),
-    [{emqttd_auth_anonymous, _, 0}] = ?AC:lookup_mods(auth).
+    [] = ?AC:lookup_mods(auth).
 
 check_acl(_) ->
     User1 = #mqtt_client{client_id = <<"client1">>, username = <<"testuser">>},
     User2 = #mqtt_client{client_id = <<"client2">>, username = <<"xyz">>},
     allow = ?AC:check_acl(User1, subscribe, <<"users/testuser/1">>),
     allow = ?AC:check_acl(User1, subscribe, <<"clients/client1">>),
-    deny  = ?AC:check_acl(User1, subscribe, <<"clients/client1/x/y">>),
+    allow = ?AC:check_acl(User1, subscribe, <<"clients/client1/x/y">>),
     allow = ?AC:check_acl(User1, publish, <<"users/testuser/1">>),
     allow = ?AC:check_acl(User1, subscribe, <<"a/b/c">>),
-    deny  = ?AC:check_acl(User2, subscribe, <<"a/b/c">>).
+    allow = ?AC:check_acl(User2, subscribe, <<"a/b/c">>).
 
 %%--------------------------------------------------------------------
 %% emqttd_access_rule

+ 174 - 0
test/emqttd_vm_SUITE.erl

@@ -0,0 +1,174 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqttd_vm_SUITE).
+
+-compile(export_all).
+
+-include_lib("common_test/include/ct.hrl").
+
+-define(SYSTEM_INFO, [allocated_areas,
+                      allocator,
+                      alloc_util_allocators,
+                      build_type,
+                      check_io,
+                      compat_rel,
+                      creation,
+                      debug_compiled,
+                      dist,
+                      dist_ctrl,
+                      driver_version,
+                      elib_malloc,
+                      dist_buf_busy_limit,
+                      %fullsweep_after, % included in garbage_collection
+                      garbage_collection,
+                      %global_heaps_size, % deprecated
+                      heap_sizes,
+                      heap_type,
+                      info,
+                      kernel_poll,
+                      loaded,
+                      logical_processors,
+                      logical_processors_available,
+                      logical_processors_online,
+                      machine,
+                      %min_heap_size, % included in garbage_collection
+                      %min_bin_vheap_size, % included in garbage_collection
+                      modified_timing_level,
+                      multi_scheduling,
+                      multi_scheduling_blockers,
+                      otp_release,
+                      port_count,
+                      process_count,
+                      process_limit,
+                      scheduler_bind_type,
+                      scheduler_bindings,
+                      scheduler_id,
+                      schedulers,
+                      schedulers_online,
+                      smp_support,
+                      system_version,
+                      system_architecture,
+                      threads,
+                      thread_pool_size,
+                      trace_control_word,
+                      update_cpu_info,
+                      version,
+                      wordsize]).
+
+-define(PROCESS_INFO, [initial_call,
+                       current_function,
+                       registered_name,
+                       status,
+                       message_queue_len,
+                       group_leader,
+                       priority,
+                       trap_exit,
+                       reductions,
+                       %%binary,
+                       last_calls,
+                       catchlevel,
+                       trace,
+                       suspending,
+                       sequential_trace_token,
+                       error_handler]).
+
+-define(PROCESS_GC, [memory,
+                     total_heap_size,
+                     heap_size,
+                     stack_size,
+                     min_heap_size]).
+                     %fullsweep_after]).
+
+
+
+all() ->
+    [load, systeminfo, mem_info, process_list, process_info, process_gc, 
+     get_ets_list, get_ets_info, get_ets_object, get_port_types, get_port_info,
+     scheduler_usage, get_memory, microsecs, schedulers, get_process_group_leader_info,
+     get_process_limit].
+
+load(_Config) ->
+    Loads = emqttd_vm:loads(),
+    [{load1, _}, {load5, _}, {load15, _}] = Loads.
+
+systeminfo(_Config) ->
+   Keys =  [Key || {Key, _} <- emqttd_vm:get_system_info()],
+   ?SYSTEM_INFO = Keys.
+
+mem_info(_Config) ->
+    application:ensure_all_started(os_mon),
+    MemInfo = emqttd_vm:mem_info(),
+    [{total_memory, _},
+     {used_memory, _}]= MemInfo,
+    application:stop(os_mon).
+
+process_list(_Config) ->
+    Pid = self(),
+    ProcessInfo = emqttd_vm:get_process_list(),
+    true = lists:member({pid, Pid}, lists:concat(ProcessInfo)).
+
+process_info(_Config) ->
+    ProcessInfos = emqttd_vm:get_process_info(), 
+    ProcessInfo = lists:last(ProcessInfos),
+    Keys = [K || {K, _V}<- ProcessInfo],
+    ?PROCESS_INFO = Keys.
+
+process_gc(_Config) ->
+    ProcessGcs = emqttd_vm:get_process_gc(), 
+    ProcessGc = lists:last(ProcessGcs),
+    Keys = [K || {K, _V}<- ProcessGc],
+    ?PROCESS_GC = Keys.
+   
+get_ets_list(_Config) ->
+    ets:new(test, [named_table]),
+    Ets =  emqttd_vm:get_ets_list(),
+    true = lists:member(test, Ets).
+
+get_ets_info(_Config) ->    
+    ets:new(test, [named_table]),
+    [] = emqttd_vm:get_ets_info(test1),
+    EtsInfo = emqttd_vm:get_ets_info(test),
+    test = proplists:get_value(name, EtsInfo).
+
+get_ets_object(_Config) ->
+    ets:new(test, [named_table]),
+    ets:insert(test, {k, v}),
+    [{k, v}] = emqttd_vm:get_ets_object(test).
+
+get_port_types(_Config) ->
+    emqttd_vm:get_port_types().
+
+get_port_info(_Config) ->
+    emqttd_vm:get_port_info().
+
+scheduler_usage(_Config) ->
+    emqttd_vm:scheduler_usage(5000).
+
+get_memory(_Config) ->
+    emqttd_vm:get_memory().
+   
+microsecs(_Config) ->
+    emqttd_vm:microsecs().
+
+schedulers(_Config) ->
+    emqttd_vm:schedulers().
+
+get_process_group_leader_info(_Config) ->
+    emqttd_vm:get_process_group_leader_info(self()).
+
+get_process_limit(_Config) ->
+    emqttd_vm:get_process_limit().