Feng 9 лет назад
Родитель
Сommit
c0c79e8171

+ 0 - 252
etc/emqttd.2.0.conf

@@ -1,252 +0,0 @@
-
-%%--------------------------------------------------------------------
-%% MQTT Protocol
-%%--------------------------------------------------------------------
-
-%% Max ClientId Length Allowed.
-{mqtt_max_clientid_len, 512}.
-
-%% Max Packet Size Allowed, 64K by default.
-{mqtt_max_packet_size, 65536}.
-
-%% Client Idle Timeout.
-{mqtt_client_idle_timeout, 30}. % Second
-
-%%--------------------------------------------------------------------
-%% Authentication
-%%--------------------------------------------------------------------
-
-%% Anonymous: Allow all
-{auth, anonymous, []}.
-
-%% Authentication with username, password
-{auth, username, [{passwd, "etc/modules/passwd.conf"}]}.
-
-%% Authentication with clientId
-{auth, clientid, [{config, "etc/modules/client.conf"}, {password, no}]}.
-
-%%--------------------------------------------------------------------
-%% ACL
-%%--------------------------------------------------------------------
-
-{acl, anonymous, []}.
-
-{acl, internal, [{config, "etc/modules/acl.conf"}, {nomatch, allow}]}.
-
-%% Cache ACL result for PUBLISH
-{cache_acl, true}.
-
-
-%%--------------------------------------------------------------------
-%% Session
-%%--------------------------------------------------------------------
-
-%% Max number of QoS 1 and 2 messages that can be “inflight” at one time.
-%% 0 means no limit
-{session_max_inflight, 100}.
-
-%% Retry interval for redelivering QoS1/2 messages.
-{session_unack_retry_interval, 60}.
-
-%% Awaiting PUBREL Timeout
-{session_await_rel_timeout, 20}.
-
-%% Max Packets that Awaiting PUBREL, 0 means no limit
-{session_max_awaiting_rel, 0}.
-
-%% Statistics Collection Interval(seconds)
-{session_collect_interval, 0}.
-
-%% Expired after 2 day (unit: minute)
-{session_expired_after, 2880}.
-
-%%--------------------------------------------------------------------
-%% Queue
-%%--------------------------------------------------------------------
-
-%% Type: simple | priority
-{queue_type, simple}.
-
-%% Topic Priority: 0~255, Default is 0
-%% {queue_priority, [{"topic/1", 10}, {"topic/2", 8}]}.
-
-%% Max queue length. Enqueued messages when persistent client disconnected,
-%% or inflight window is full.
-{queue_max_length, infinity}.
-
-%% Low-water mark of queued messages
-{queue_low_watermark, 0.2}.
-
-%% High-water mark of queued messages
-{queue_high_watermark, 0.6}.
-
-%% Queue Qos0 messages?
-{queue_qos0, true}.
-
-%%--------------------------------------------------------------------
-%% Listener
-%%--------------------------------------------------------------------
-
-%% Plain MQTT
-{listener, mqtt, 1883, [
-    %% Size of acceptor pool
-    {acceptors, 16},
-
-    %% Maximum number of concurrent clients
-    {max_clients, 512},
-
-    %% Mount point prefix
-    %% {mount_point, "prefix/"},
-
-    %% Socket Access Control
-    {access, [{allow, all}]},
-
-    %% Connection Options
-    {connopts, [
-        %% Rate Limit. Format is 'burst, rate', Unit is KB/Sec
-        %% {rate_limit, "100,10"} %% 100K burst, 10K rate
-    ]},
-
-    %% Socket Options
-    {sockopts, [
-        %Set buffer if hight thoughtput
-        %{recbuf, 4096},
-        %{sndbuf, 4096},
-        %{buffer, 4096},
-        %{nodelay, true},
-        {backlog, 1024}
-    ]}
-]}.
-
-%% MQTT/SSL
-{listener, mqtts, 8883, [
-    %% Size of acceptor pool
-    {acceptors, 4},
-
-    %% Maximum number of concurrent clients
-    {max_clients, 512},
-
-    %% Mount point prefix
-    %% {mount_point, "secure/"},
-
-    %% Socket Access Control
-    {access, [{allow, all}]},
-
-    %% SSL certificate and key files
-    {ssl, [{handshake_timeout, 10000},
-           %% Mutual SSL Authentication option
-           %% {verify, verify_peer},
-           %% {cacertfile, "etc/ssl/ca.pem"},
-           {certfile, "etc/ssl/ssl.crt"},
-           {keyfile,  "etc/ssl/ssl.key"}]},
-
-    %% Socket Options
-    {sockopts, [
-        {backlog, 1024}
-        %{buffer, 4096},
-    ]}
-]}.
-
-%% HTTP and WebSocket Listener
-{listener, http, 8083, [
-    %% Size of acceptor pool
-    {acceptors, 4},
-
-    %% Maximum number of concurrent clients
-    {max_clients, 64},
-
-    %% Socket Access Control
-    {access, [{allow, all}]},
-
-    %% Socket Options
-    {sockopts, [
-        {backlog, 1024}
-        %{buffer, 4096},
-    ]}
-]}.
-
-%%--------------------------------------------------------------------
-%% PubSub
-%%--------------------------------------------------------------------
-
-%% PubSub Pool Size. Default should be scheduler numbers.
-{pubsub_pool_size, 8}.
-
-{pubsub_by_clientid, true}.
-
-%% Subscribe Asynchronously
-{pubsub_async, true}.
-
-%%--------------------------------------------------------------------
-%% Bridge
-%%--------------------------------------------------------------------
-
-%% TODO: Bridge Queue Size
-{bridge_max_queue_len, 10000}.
-
-%% Ping Interval of bridge node
-{bridge_ping_down_interval, 1}. % second
-
-%%-------------------------------------------------------------------
-%% Plugins
-%%-------------------------------------------------------------------
-
-%% Dir of plugins' config
-{plugins_etc_dir, "etc/plugins/"}.
-
-%% File to store loaded plugin names.
-{plugins_loaded_file, "data/loaded_plugins"}.
-
-%%--------------------------------------------------------------------
-%% Modules
-%%--------------------------------------------------------------------
-
-%% Retainer Module
-{module, retainer, [
-
-    %% disc: disc_copies, ram: ram_copies
-    {storage_type, disc},
-
-    %% Max number of retained messages
-    {max_message_num, 100000},
-
-    %% Max Payload Size of retained message
-    {max_playload_size, 65536},
-
-    %% Expired after seconds, never expired if 0
-    {expired_after, 0}
-
-]}.
-
-%% Client presence management module. Publish presence messages when 
-%% client connected or disconnected.
-{module, presence, [{qos, 0}]}.
-
-%% Subscribe topics automatically when client connected
-{module, subscription, [{"$client/%c", 1}]}.
-
-%% [Rewrite](https://github.com/emqtt/emqttd/wiki/Rewrite)
-%% {module, rewrite, [{config, "etc/modules/rewrite.conf"}]}.
-
-%%-------------------------------------------------------------------
-%% Erlang System Monitor
-%%-------------------------------------------------------------------
-
-%% Long GC, don't monitor in production mode for:
-%% https://github.com/erlang/otp/blob/feb45017da36be78d4c5784d758ede619fa7bfd3/erts/emulator/beam/erl_gc.c#L421
-
-{sysmon_long_gc, false}.
-
-%% Long Schedule(ms)
-{sysmon_long_schedule, 240}.
-
-%% 8M words. 32MB on 32-bit VM, 64MB on 64-bit VM.
-%% 8 * 1024 * 1024
-{sysmon_large_heap, 8388608}.
-
-%% Busy Port
-{sysmon_busy_port, false}.
-
-%% Busy Dist Port
-{sysmon_busy_dist_port, true}.
-

+ 86 - 58
etc/emqttd.conf

@@ -1,51 +1,70 @@
 ##====================================================================
 ##
-## Config File for EMQ 3.0
+## Configration for EMQ 3.0
 ##
 ##====================================================================
 
 ##--------------------------------------------------------------------
-## Erlang VM
+## Node
 ##--------------------------------------------------------------------
 
-## Erlang node name
-vm.nodename = emqttd@127.0.0.1
+## Node Id: 1~255
+node.id = 1
 
-## Cookie for distributed erlang
-vm.setcookie = emqsecretcookie
+## Node Name
+node.name = emqttd@127.0.0.1
+
+## Cookie for distributed node
+node.cookie = emq_dist_cookie
 
 ## SMP support: enable, auto, disable
-vm.smp = auto
+node.smp = auto
 
 ## Enable kernel poll
-vm.kernel_poll = on
+node.kernel_poll = on
 
 ## async thread pool
-vm.async_threads = 32
+node.async_threads = 32
 
 ## Erlang Process Limit
-vm.process_limit = 256000
+node.process_limit = 256000
 
 ## Sets the maximum number of simultaneously existing ports for this system
-vm.max_ports = 262144
+node.max_ports = 262144
 
 ## Set the distribution buffer busy limit (dist_buf_busy_limit)
-vm.dist_buffer_size = 32MB
+node.dist_buffer_size = 32MB
 
 ## Max ETS Tables.
 ## Note that mnesia and SSL will create temporary ets tables.
-vm.max_ets_tables = 256000
+node.max_ets_tables = 256000
 
 ## Tweak GC to run more often
-vm.fullsweep_after = 1000
+node.fullsweep_after = 1000
 
 ## Crash dump
-vm.crash_dump = log/crash.dump
+node.crash_dump = $(platform_log_dir)/crash.dump
 
 ##--------------------------------------------------------------------
 ## Log
 ##--------------------------------------------------------------------
 
+## Console log. Enum: off, file, console, both
+log.console = console
+
+## Console log level. Enum: debug, info, notice, warning, error, critical, alert, emergency, none
+log.console.level = error
+
+## Console log file
+## log.console.file = $(platform_log_dir)/console.log
+
+## Error log file
+log.error.file = $(platform_log_dir)/error.log
+
+## Enable the crash log. Enum: on, off
+log.crash = on
+
+log.crash.file = $(platform_log_dir)/crash.log
 
 ##--------------------------------------------------------------------
 ## MQTT Protocol
@@ -60,45 +79,6 @@ mqtt.max_packet_size = 64KB
 ## Client Idle Timeout (Second)
 mqtt.client_idle_timeout = 30
 
-##--------------------------------------------------------------------
-## MQTT Listeners
-##--------------------------------------------------------------------
-
-## TCP Listener: 1883, 127.0.0.1:1883, ::1:1883
-## mqtt.listener.tcp = 1883
-## mqtt.listener.tcp.acceptors = 16
-## mqtt.listener.tcp.max_clients = 512
-## Rate Limit. Format is 'burst,rate', Unit is KB/Sec
-## mqtt.listener.tcp.rate_limit = 100,10
-## Mount Point
-## mqtt.listener.tcp.mount_point = prefix/
-## TCP Socket Options. Set buffer if hight thoughtput
-## mqtt.listener.tcp.opts.recbuf = 4096
-## mqtt.listener.tcp.opts.sndbuf = 4096
-## mqtt.listener.tcp.opts.buffer = 4096
-## mqtt.listener.tcp.opts.nodelay = true
-## mqtt.listener.tcp.opts.backlog = 1024
-
-## SSL Listener
-## mqtt.listener.ssl = 883
-## mqtt.listener.ssl.acceptors = 4
-## mqtt.listener.ssl.max_clients = 512
-## Rate Limit. Format is 'burst,rate', Unit is KB/Sec
-## mqtt.listener.ssl.rate_limit = 100,10
-## Mount Point
-## mqtt.listener.ssl.mount_point = prefix/
-## mqtt.listener.ssl.opts.handshake_timeout = 10 ## Seconds
-## mqtt.listener.ssl.opts.certfile = etc/ssl/cert.pem
-## mqtt.listener.ssl.opts.keyfile = etc/ssl/key.pem
-## mqtt.listener.ssl.opts.cacertfile = etc/ssl/cacert.pem
-## mqtt.listener.ssl.opts.verify = verify_peer
-## mqtt.listener.ssl.opts.failed_if_no_peer_cert = true
-
-## HTTP Listener
-## mqtt.listener.http = 8083
-## mqtt.listener.http.acceptors = 4
-## mqtt.listener.http.max_clients = 64
-
 ##--------------------------------------------------------------------
 ## MQTT Session
 ##--------------------------------------------------------------------
@@ -108,7 +88,7 @@ mqtt.client_idle_timeout = 30
 mqtt.session.max_inflight = 100
 
 ## Retry interval for redelivering QoS1/2 messages.
-mqtt.session.unack_retry_interval = 60
+mqtt.session.retry_interval = 60
 
 ## Awaiting PUBREL Timeout
 mqtt.session.await_rel_timeout = 20
@@ -165,11 +145,55 @@ mqtt.pubsub.by_clientid = true
 ## Subscribe Asynchronously
 mqtt.pubsub.async = true
 
+##--------------------------------------------------------------------
+## MQTT Listeners
+##--------------------------------------------------------------------
+
+## TCP Listener: 1883, 127.0.0.1:1883, ::1:1883
+mqtt.listener.tcp = 1883
+mqtt.listener.tcp.acceptors = 8
+mqtt.listener.tcp.max_clients = 1024
+## Rate Limit. Format is 'burst,rate', Unit is KB/Sec
+## mqtt.listener.tcp.rate_limit = 100,10
+## Mount Point
+## mqtt.listener.tcp.mount_point = prefix/
+## TCP Socket Options. Set buffer if hight thoughtput
+## mqtt.listener.tcp.opts.recbuf = 4096
+## mqtt.listener.tcp.opts.sndbuf = 4096
+## mqtt.listener.tcp.opts.buffer = 4096
+## mqtt.listener.tcp.opts.nodelay = true
+## mqtt.listener.tcp.opts.backlog = 1024
+
+## SSL Listener
+mqtt.listener.ssl = 8883
+mqtt.listener.ssl.acceptors = 4
+mqtt.listener.ssl.max_clients = 512
+## Rate Limit. Format is 'burst,rate', Unit is KB/Sec
+## mqtt.listener.ssl.rate_limit = 100,10
+## Mount Point
+## mqtt.listener.ssl.mount_point = prefix/
+## mqtt.listener.ssl.opts.handshake_timeout = 10 ## Seconds
+## mqtt.listener.ssl.opts.certfile = $(platform_etc_dir)/ssl/cert.pem
+## mqtt.listener.ssl.opts.keyfile = $(platform_etc_dir)/ssl/key.pem
+## mqtt.listener.ssl.opts.cacertfile = $(platform_etc_dir)/ssl/cacert.pem
+## mqtt.listener.ssl.opts.verify = verify_peer
+## mqtt.listener.ssl.opts.failed_if_no_peer_cert = true
+
+## HTTP Listener
+mqtt.listener.http = 127.0.0.1:8083
+mqtt.listener.http.acceptors = 4
+mqtt.listener.http.max_clients = 64
+
+## HTTP(SSL) Listener
+## mqtt.listener.http = 8083
+## mqtt.listener.http.acceptors = 4
+## mqtt.listener.http.max_clients = 64
+
 ##--------------------------------------------------------------------
 ## MQTT Bridge
 ##--------------------------------------------------------------------
 
-## TODO: Bridge Queue Size
+## Bridge Queue Size
 mqtt.bridge.max_queue_len = 10000
 
 ## Ping Interval of bridge node
@@ -180,10 +204,10 @@ mqtt.bridge.ping_down_interval = 1 ## Second
 ##-------------------------------------------------------------------
 
 ## Dir of plugins' config
-## mqtt.plugins.etc_dir = etc/plugins/
+mqtt.plugins.etc_dir = $(platform_etc_dir)/plugins/
 
 ## File to store loaded plugin names.
-## mqtt.plugins.loaded_file = data/loaded_plugins
+mqtt.plugins.loaded_file = $(platform_data_dir)/loaded_plugins
 
 ##-------------------------------------------------------------------
 ## MQTT Modules
@@ -235,3 +259,7 @@ sysmon.busy_port = false
 ## Busy Dist Port
 sysmon.busy_dist_port = true
 
+platform_etc_dir = ./etc
+
+platform_log_dir = ./log
+

+ 0 - 31
etc/emqttd.conf.3

@@ -28,19 +28,6 @@
 %% Cache ACL result for PUBLISH
 {cache_acl, true}.
 
-##--------------------------------------------------------------------
-## Broker
-##--------------------------------------------------------------------
-
-## System interval of publishing broker $SYS messages
-broker.sys_interval = 60.
-
-##--------------------------------------------------------------------
-## Session
-##--------------------------------------------------------------------
-
-
-
 %%--------------------------------------------------------------------
 %% Listener
 %%--------------------------------------------------------------------
@@ -190,21 +177,3 @@ broker.sys_interval = 60.
 %% Erlang System Monitor
 %%-------------------------------------------------------------------
 
-%% Long GC, don't monitor in production mode for:
-%% https://github.com/erlang/otp/blob/feb45017da36be78d4c5784d758ede619fa7bfd3/erts/emulator/beam/erl_gc.c#L421
-
-sysmon.long_gc = false
-
-%% Long Schedule(ms)
-sysmon.long_schedule = 240
-
-%% 8M words. 32MB on 32-bit VM, 64MB on 64-bit VM.
-%% 8 * 1024 * 1024
-sysmon.large_heap = 8388608
-
-%% Busy Port
-sysmon.busy_port = false
-
-%% Busy Dist Port
-sysmon.busy_dist_port = true
-

+ 240 - 240
priv/emqttd.schema

@@ -2,42 +2,49 @@
 %% EMQ 3.0 config mapping
 
 %%--------------------------------------------------------------------
-%% Erlang VM Args
+%% Erlang Node
 %%--------------------------------------------------------------------
 
+%% @doc Node Id 
+{mapping, "node.id", "emqttd.node_id", [
+  {default, 1},
+  {datatype, integer},
+  {validators, ["range:0-255"]}
+]}.
+
 %% @doc Erlang node name
-{mapping, "vm.nodename", "vm_args.-name", [
+{mapping, "node.name", "vm_args.-name", [
   {default, "emqttd@127.0.0.1"}
 ]}.
 
 %% @doc Secret cookie for distributed erlang node
-{mapping, "vm.setcookie", "vm_args.-setcookie", [
+{mapping, "node.cookie", "vm_args.-setcookie", [
   {default, "emqsecretcookie"}
 ]}.
 
 %% @doc SMP Support
-{mapping, "vm.smp", "vm_args.-smp", [
+{mapping, "node.smp", "vm_args.-smp", [
   {default, auto},
   {datatype, {enum, [enable, auto, disable]}},
   hidden
 ]}.
 
 %% @doc Enable Kernel Poll
-{mapping, "vm.kernel_poll", "vm_args.+K", [
+{mapping, "node.kernel_poll", "vm_args.+K", [
   {default, on},
   {datatype, flag},
   hidden
 ]}.
 
 %% @doc More information at: http://erlang.org/doc/man/erl.html
-{mapping, "vm.async_threads", "vm_args.+A", [
+{mapping, "node.async_threads", "vm_args.+A", [
   {default, 64},
   {datatype, integer},
   {validators, ["range:0-1024"]}
 ]}.
 
 %% @doc Erlang Process Limit
-{mapping, "vm.process_limit", "vm_args.+P", [
+{mapping, "node.process_limit", "vm_args.+P", [
   {datatype, integer},
   {default, 256000},
   hidden
@@ -46,7 +53,7 @@
 %% Note: OTP R15 and earlier uses -env ERL_MAX_PORTS, R16+ uses +Q
 %% @doc The number of concurrent ports/sockets
 %% Valid range is 1024-134217727
-{mapping, "vm.max_ports",
+{mapping, "node.max_ports",
   cuttlefish:otp("R16", "vm_args.+Q", "vm_args.-env ERL_MAX_PORTS"), [
   {default, 262144},
   {datatype, integer},
@@ -57,7 +64,7 @@
  fun(X) -> X >= 1024 andalso X =< 134217727 end}.
 
 %% @doc http://www.erlang.org/doc/man/erl.html#%2bzdbbl
-{mapping, "vm.dist_buffer_size", "vm_args.+zdbbl", [
+{mapping, "node.dist_buffer_size", "vm_args.+zdbbl", [
   {datatype, bytesize},
   {commented, "32MB"},
   hidden,
@@ -83,7 +90,7 @@
 }.
 
 %% @doc http://www.erlang.org/doc/man/erlang.html#system_flag-2
-{mapping, "vm.fullsweep_after", "vm_args.-env ERL_FULLSWEEP_AFTER", [
+{mapping, "node.fullsweep_after", "vm_args.-env ERL_FULLSWEEP_AFTER", [
   {default, 1000},
   {datatype, integer},
   hidden,
@@ -96,7 +103,7 @@
 %% Note: OTP R15 and earlier uses -env ERL_MAX_ETS_TABLES,
 %% R16+ uses +e
 %% @doc The ETS table limit
-{mapping, "vm.max_ets_tables",
+{mapping, "node.max_ets_tables",
   cuttlefish:otp("R16", "vm_args.+e", "vm_args.-env ERL_MAX_ETS_TABLES"), [
   {default, 256000},
   {datatype, integer},
@@ -104,35 +111,130 @@
 ]}.
 
 %% @doc Set the location of crash dumps
-{mapping, "vm.crash_dump", "vm_args.-env ERL_CRASH_DUMP", [
+{mapping, "node.crash_dump", "vm_args.-env ERL_CRASH_DUMP", [
   {default, "{{crash_dump}}"},
   {datatype, file},
   hidden
 ]}.
 
+%%--------------------------------------------------------------------
+%% Log
+%%--------------------------------------------------------------------
+
+{mapping, "log.console", "lager.handlers", [
+  {default, file },
+  {datatype, {enum, [off, file, console, both]}}
+]}.
+
+{mapping, "log.console.level", "lager.handlers", [
+  {default, info},
+  {datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency, none]}}
+]}.
+
+{mapping, "log.console.file", "lager.handlers", [
+  {default, "$(platform_log_dir)/console.log"},
+  {datatype, file}
+]}.
+
+{mapping, "log.error.file", "lager.handlers", [
+  {default, "$(platform_log_dir)/error.log"},
+  {datatype, file}
+]}.
+
+{mapping, "log.error.redirect", "lager.error_logger_redirect", [
+  {default, on},
+  {datatype, flag},
+  hidden
+]}.
+
+{mapping, "log.error.messages_per_second", "lager.error_logger_hwm", [
+  {default, 1000},
+  {datatype, integer},
+  hidden
+]}.
+
+{translation,
+ "lager.handlers",
+ fun(Conf) ->
+    ErrorHandler = case cuttlefish:conf_get("log.error.file", Conf) of
+      undefined -> [];
+      ErrorFilename -> [{lager_file_backend, [{file, ErrorFilename},
+                                              {level, error},
+                                              {size, 10485760},
+                                              {date, "$D0"},
+                                              {count, 5}]}]
+    end,
+
+    ConsoleLogLevel = cuttlefish:conf_get("log.console.level", Conf),
+    ConsoleLogFile = cuttlefish:conf_get("log.console.file", Conf),
+
+    ConsoleHandler = {lager_console_backend, ConsoleLogLevel},
+    ConsoleFileHandler = {lager_file_backend, [{file, ConsoleLogFile},
+                                               {level, ConsoleLogLevel},
+                                               {size, 10485760},
+                                               {date, "$D0"},
+                                               {count, 5}]},
+
+    ConsoleHandlers = case cuttlefish:conf_get("log.console", Conf) of
+      off -> [];
+      file -> [ConsoleFileHandler];
+      console -> [ConsoleHandler];
+      both -> [ConsoleHandler, ConsoleFileHandler];
+      _ -> []
+    end,
+    ConsoleHandlers ++ ErrorHandler
+  end
+}.
+
+{mapping, "log.crash", "lager.crash_log", [
+  {default, on},
+  {datatype, flag}
+]}.
+
+{mapping, "log.crash.file", "lager.crash_log", [
+  {default, "$(platform_log_dir)/crash.log"},
+  {datatype, file}
+]}.
+
+{translation,
+ "lager.crash_log",
+ fun(Conf) ->
+     case cuttlefish:conf_get("log.crash", Conf) of
+         false -> undefined;
+         _ ->
+             cuttlefish:conf_get("log.crash.file", Conf, "./log/crash.log")
+     end
+ end}.
+
+{mapping, "sasl", "sasl.sasl_error_logger", [
+  {default, off},
+  {datatype, flag},
+  hidden
+]}.
+
 %%--------------------------------------------------------------------
 %% MQTT Protocol
 %%--------------------------------------------------------------------
 
 %% @doc Set the Max ClientId Length Allowed.
-{mapping, "mqtt.max_clientid_len", "emqttd.mqtt_protocol", [
+{mapping, "mqtt.max_clientid_len", "emqttd.protocol", [
   {default, 1024},
   {datatype, integer}
 ]}.
 
 %% @doc Max Packet Size Allowed, 64K by default.
-{mapping, "mqtt.max_packet_size", "emqttd.mqtt_protocol", [
+{mapping, "mqtt.max_packet_size", "emqttd.protocol", [
   {default, "64KB"},
   {datatype, bytesize}
 ]}.
 
 %% @doc Client Idle Timeout.
-{mapping, "mqtt.client_idle_timeout", "emqttd.mqtt_protocol", [
+{mapping, "mqtt.client_idle_timeout", "emqttd.protocol", [
   {default, 30},
   {datatype, integer}
 ]}.
 
-{translation, "emqttd.mqtt_protocol", fun(Conf) ->
+{translation, "emqttd.protocol", fun(Conf) ->
   [{max_clientid_len,    cuttlefish:conf_get("mqtt.max_clientid_len", Conf)},
    {max_packet_size,     cuttlefish:conf_get("mqtt.max_packet_size", Conf)},
    {client_idle_timeout, cuttlefish:conf_get("mqtt.client_idle_timeout", Conf)}]
@@ -144,45 +246,45 @@ end}.
 
 %% @doc Max number of QoS 1 and 2 messages that can be “inflight” at one time.
 %% 0 means no limit
-{mapping, "mqtt.session.max_inflight", "emqttd.mqtt_session", [
+{mapping, "mqtt.session.max_inflight", "emqttd.session", [
   {default, 100},
   {datatype, integer}
 ]}.
 
 
 %% @doc Retry interval for redelivering QoS1/2 messages.
-{mapping, "mqtt.session.unack_retry_interval", "emqttd.mqtt_session", [
+{mapping, "mqtt.session.retry_interval", "emqttd.session", [
   {default, 60},
   {datatype, integer}
 ]}.
 
 %% @doc Awaiting PUBREL Timeout
-{mapping, "mqtt.session.await_rel_timeout", "emqttd.mqtt_session", [
+{mapping, "mqtt.session.await_rel_timeout", "emqttd.session", [
   {default, 30},
   {datatype, integer}
 ]}.
 
 %% @doc Max Packets that Awaiting PUBREL, 0 means no limit
-{mapping, "mqtt.session.max_awaiting_rel", "emqttd.mqtt_session", [
+{mapping, "mqtt.session.max_awaiting_rel", "emqttd.session", [
   {default, 0},
   {datatype, integer}
 ]}.
 
 %% @doc Statistics Collection Interval(seconds)
-{mapping, "mqtt.session.collect_interval", "emqttd.mqtt_session", [
+{mapping, "mqtt.session.collect_interval", "emqttd.session", [
   {default, 0},
   {datatype, integer}
 ]}.
 
 %% @doc Session expired after...
-{mapping, "mqtt.session.expired_after", "emqttd.mqtt_session", [
+{mapping, "mqtt.session.expired_after", "emqttd.session", [
   {default, "2d"},
   {datatype, {duration, s}}
 ]}.
 
-{translation, "emqttd.mqtt_session", fun(Conf) ->
+{translation, "emqttd.session", fun(Conf) ->
   [{max_inflight, cuttlefish:conf_get("mqtt.session.max_inflight", Conf)},
-   {unack_retry_interval, cuttlefish:conf_get("mqtt.session.unack_retry_interval", Conf)},
+   {retry_interval, cuttlefish:conf_get("mqtt.session.retry_interval", Conf)},
    {await_rel_timeout, cuttlefish:conf_get("mqtt.session.await_rel_timeout", Conf)},
    {max_awaiting_rel, cuttlefish:conf_get("mqtt.session.max_awaiting_rel", Conf)},
    {collect_interval, cuttlefish:conf_get("mqtt.session.collect_interval", Conf)},
@@ -194,45 +296,45 @@ end}.
 %%--------------------------------------------------------------------
 
 %% @doc Type: simple | priority
-{mapping, "mqtt.queue.type", "emqttd.mqtt_queue", [
+{mapping, "mqtt.queue.type", "emqttd.queue", [
   {default, simple},
   {datatype, atom}
 ]}.
 
 %% @doc Topic Priority: 0~255, Default is 0
-{mapping, "mqtt.queue.priority", "emqttd.mqtt_queue", [
+{mapping, "mqtt.queue.priority", "emqttd.queue", [
   {default, ""},
   {datatype, string},
   hidden
 ]}.
 
 %% @doc Max queue length. Enqueued messages when persistent client disconnected, or inflight window is full.
-{mapping, "mqtt.queue.max_length", "emqttd.mqtt_queue", [
+{mapping, "mqtt.queue.max_length", "emqttd.queue", [
   {default, infinity},
   {datatype, [atom, integer]}
 ]}.
 
 %% @doc Low-water mark of queued messages
-{mapping, "mqtt.queue.low_watermark", "emqttd.mqtt_queue", [
+{mapping, "mqtt.queue.low_watermark", "emqttd.queue", [
   {default, "20%"},
   {datatype, string},
   hidden
 ]}.
 
 %% @doc High-water mark of queued messages
-{mapping, "mqtt.queue.high_watermark", "emqttd.mqtt_queue", [
+{mapping, "mqtt.queue.high_watermark", "emqttd.queue", [
   {default, "60%"},
   {datatype, string},
   hidden
 ]}.
 
 %% @doc Queue Qos0 messages?
-{mapping, "mqtt.queue.qos0", "emqttd.mqtt_queue", [
+{mapping, "mqtt.queue.qos0", "emqttd.queue", [
   {default, true},
   {datatype, {enum, [true, false]}}
 ]}.
 
-{translation, "emqttd.mqtt_queue", fun(Conf) ->
+{translation, "emqttd.queue", fun(Conf) ->
   Parse = fun(S) ->
 			{match, [N]} = re:run(S, "^([0-9]+)%$", [{capture, all_but_first, list}]),
 			list_to_integer(N) / 100
@@ -254,58 +356,150 @@ end}.
 %%--------------------------------------------------------------------
 %% MQTT Broker
 %%--------------------------------------------------------------------
-{mapping, "mqtt.broker.sys_interval", "emqttd.mqtt_broker", [
+
+{mapping, "mqtt.broker.sys_interval", "emqttd.broker", [
   {default, 60},
   {datatype, integer}
 ]}.
 
-{translation, "emqttd.mqtt_broker", fun(Conf) ->
+{translation, "emqttd.broker", fun(Conf) ->
   [{sys_interval, cuttlefish:conf_get("mqtt.broker.sys_interval", Conf)}]
 end}.
 
 %%--------------------------------------------------------------------
 %% MQTT PubSub
 %%--------------------------------------------------------------------
-{mapping, "mqtt.pubsub.pool_size", "emqttd.mqtt_pubsub", [
+
+{mapping, "mqtt.pubsub.pool_size", "emqttd.pubsub", [
   {default, 8},
   {datatype, integer}
 ]}.
 
-{mapping, "mqtt.pubsub.by_clientid", "emqttd.mqtt_pubsub", [
+{mapping, "mqtt.pubsub.by_clientid", "emqttd.pubsub", [
   {default, true},
   {datatype, {enum, [true, false]}}
 ]}.
 
-{mapping, "mqtt.pubsub.async", "emqttd.mqtt_pubsub", [
+{mapping, "mqtt.pubsub.async", "emqttd.pubsub", [
   {default, true},
   {datatype, {enum, [true, false]}},
   hidden
 ]}.
 
-{translation, "emqttd.mqtt_pubsub", fun(Conf) ->
+{translation, "emqttd.pubsub", fun(Conf) ->
   [{pool_size, cuttlefish:conf_get("mqtt.pubsub.pool_size", Conf)},
    {by_clientid, cuttlefish:conf_get("mqtt.pubsub.by_clientid", Conf)},
    {async, cuttlefish:conf_get("mqtt.pubsub.async", Conf)}]
 end}.
 
+%%--------------------------------------------------------------------
+%% MQTT Listeners
+%%--------------------------------------------------------------------
+
+{mapping, "mqtt.listener.tcp", "emqttd.listeners", [
+  {default, 1883},
+  {datatype, [integer, ip]}
+]}.
+
+{mapping, "mqtt.listener.tcp.acceptors", "emqttd.listeners", [
+  {default, 8},
+  {datatype, integer}
+]}.
+
+{mapping, "mqtt.listener.tcp.max_clients", "emqttd.listeners", [
+  {default, 1024},
+  {datatype, integer}
+]}.
+
+{mapping, "mqtt.listener.ssl", "emqttd.listeners", [
+  {default, 8883},
+  {datatype, [integer, ip]}
+]}.
+
+{mapping, "mqtt.listener.ssl.acceptors", "emqttd.listeners", [
+  {default, 8},
+  {datatype, integer}
+]}.
+
+{mapping, "mqtt.listener.ssl.max_clients", "emqttd.listeners", [
+  {default, 512},
+  {datatype, integer}
+]}.
+
+{mapping, "mqtt.listener.http", "emqttd.listeners", [
+  {default, 8883},
+  {datatype, [integer, ip]}
+]}.
+
+{mapping, "mqtt.listener.http.acceptors", "emqttd.listeners", [
+  {default, 8},
+  {datatype, integer}
+]}.
+
+{mapping, "mqtt.listener.http.max_clients", "emqttd.listeners", [
+  {default, 64},
+  {datatype, integer}
+]}.
+
+{translation, "emqttd.listeners", fun(Conf) ->
+    TcpListeners = case cuttlefish:conf_get("mqtt.listener.tcp", Conf) of
+        undefined ->
+            [];
+        TcpPort ->
+            TcpOpts = [{acceptors, cuttlefish:conf_get("mqtt.listener.tcp.acceptors", Conf)},
+                       {max_clients, cuttlefish:conf_get("mqtt.listener.tcp.max_clients", Conf)}],
+            [{tcp, TcpPort, TcpOpts}]
+        end,
+    SslListeners = case cuttlefish:conf_get("mqtt.listener.ssl", Conf) of
+        undefined ->
+            [];
+        SslPort ->
+            SslOpts = [{acceptors, cuttlefish:conf_get("mqtt.listener.ssl.acceptors", Conf)},
+                       {max_clients, cuttlefish:conf_get("mqtt.listener.ssl.max_clients", Conf)}],
+            [{ssl, SslPort, SslOpts}]
+    end,
+    HttpListeners = case cuttlefish:conf_get("mqtt.listener.http", Conf) of
+        undefined ->
+            [];
+        HttPort ->
+            HttpOpts = [{acceptors, cuttlefish:conf_get("mqtt.listener.http.acceptors", Conf)},
+                        {max_clients, cuttlefish:conf_get("mqtt.listener.http.max_clients", Conf)}],
+            [{http, HttPort, HttpOpts}]
+    end,
+    TcpListeners ++ SslListeners ++ HttpListeners
+end}.
+
 %%--------------------------------------------------------------------
 %% MQTT Bridge
 %%--------------------------------------------------------------------
-{mapping, "mqtt.bridge.max_queue_len", "emqttd.mqtt_bridge", [
+
+{mapping, "mqtt.bridge.max_queue_len", "emqttd.bridge", [
   {default, 10000},
   {datatype, integer}
 ]}.
 
-{mapping, "mqtt.bridge.ping_down_interval", "emqttd.mqtt_bridge", [
+{mapping, "mqtt.bridge.ping_down_interval", "emqttd.bridge", [
   {default, 1},
   {datatype, integer}
 ]}.
 
-{translation, "emqttd.mqtt_bridge", fun(Conf) ->
+{translation, "emqttd.bridge", fun(Conf) ->
   [{max_queue_len, cuttlefish:conf_get("mqtt.bridge.max_queue_len", Conf)},
    {ping_down_interval, cuttlefish:conf_get("mqtt.bridge.ping_down_interval", Conf)}]
 end}.
 
+%%-------------------------------------------------------------------
+%% MQTT Plugins
+%%-------------------------------------------------------------------
+
+{mapping, "mqtt.plugins.etc_dir", "emqttd.plugins_etc_dir", [
+  {datatype, string}
+]}.
+
+{mapping, "mqtt.plugins.loaded_file", "emqttd.plugins_loaded_file", [
+  {datatype, string}
+]}.
+
 %%--------------------------------------------------------------------
 %% System Monitor
 %%--------------------------------------------------------------------
@@ -349,212 +543,18 @@ end}.
      {busy_dist_port, cuttlefish:conf_get("sysmon.busy_dist_port", Conf)}]
 end}.
 
-%% @doc Where to emit the default log messages (typically at 'info'
-%% severity):
-%%     off: disabled
-%%    file: the file specified by log.console.file
-%% console: to standard output (seen when using `riak attach-direct`)
-%%    both: log.console.file and standard out.
-{mapping, "log.console", "lager.handlers", [
-  {default, file },
-  {datatype, {enum, [off, file, console, both]}}
-]}.
-
-%% @doc The severity level of the console log, default is 'info'.
-{mapping, "log.console.level", "lager.handlers", [
-  {default, info},
-  {datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency, none]}}
-]}.
-
-%% @doc When 'log.console' is set to 'file' or 'both', the file where
-%% console messages will be logged.
-{mapping, "log.console.file", "lager.handlers", [
-  {default, "$(platform_log_dir)/console.log"},
-  {datatype, file}
-]}.
-
-%% @doc The file where error messages will be logged.
-{mapping, "log.error.file", "lager.handlers", [
-  {default, "$(platform_log_dir)/error.log"},
-  {datatype, file}
-]}.
-
-%% @doc When set to 'on', enables log output to syslog.
-{mapping, "log.syslog", "lager.handlers", [
-  {default, off},
-  {datatype, flag}
-]}.
-
-%% @doc When set to 'on', enables log output to syslog.
-{mapping, "log.syslog.ident", "lager.handlers", [
-  {default, "riak"},
-  hidden
-]}.
-
-%% @doc Syslog facility to log entries from Riak.
-{mapping, "log.syslog.facility", "lager.handlers", [
-  {default, daemon},
-  {datatype, {enum,[kern, user, mail, daemon, auth, syslog,
-                    lpr, news, uucp, clock, authpriv, ftp,
-                    cron, local0, local1, local2, local3,
-                    local4, local5, local6, local7]}},
-  hidden
-]}.
-
-%% @doc The severity level at which to log entries to syslog, default is 'info'.
-{mapping, "log.syslog.level", "lager.handlers", [
-  {default, info},
-  {datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency, none]}},
-  hidden
-]}.
-
-{translation,
- "lager.handlers",
- fun(Conf) ->
-    SyslogHandler = case cuttlefish:conf_get("log.syslog", Conf) of
-      true ->
-        Ident = cuttlefish:conf_get("log.syslog.ident", Conf),
-        Facility = cuttlefish:conf_get("log.syslog.facility", Conf),
-        LogLevel = cuttlefish:conf_get("log.syslog.level", Conf),
-        [{lager_syslog_backend, [Ident, Facility, LogLevel]}];
-      _ -> []
-    end,
-    ErrorHandler = case cuttlefish:conf_get("log.error.file", Conf) of
-      undefined -> [];
-      ErrorFilename -> [{lager_file_backend, [{file, ErrorFilename},
-                                              {level, error},
-                                              {size, 10485760},
-                                              {date, "$D0"},
-                                              {count, 5}]}]
-    end,
-
-    ConsoleLogLevel = cuttlefish:conf_get("log.console.level", Conf),
-    ConsoleLogFile = cuttlefish:conf_get("log.console.file", Conf),
-
-    ConsoleHandler = {lager_console_backend, ConsoleLogLevel},
-    ConsoleFileHandler = {lager_file_backend, [{file, ConsoleLogFile},
-                                               {level, ConsoleLogLevel},
-                                               {size, 10485760},
-                                               {date, "$D0"},
-                                               {count, 5}]},
-
-    ConsoleHandlers = case cuttlefish:conf_get("log.console", Conf) of
-      off -> [];
-      file -> [ConsoleFileHandler];
-      console -> [ConsoleHandler];
-      both -> [ConsoleHandler, ConsoleFileHandler];
-      _ -> []
-    end,
-    SyslogHandler ++ ConsoleHandlers ++ ErrorHandler
-  end
-}.
-
-
-%% @doc Whether to enable Erlang's built-in error logger.
-{mapping, "sasl", "sasl.sasl_error_logger", [
-  {default, off},
-  {datatype, flag},
-  hidden
-]}.
-
-%% @doc Whether to enable the crash log.
-{mapping, "log.crash", "lager.crash_log", [
-  {default, on},
-  {datatype, flag}
-]}.
-
-%% @doc If the crash log is enabled, the file where its messages will
-%% be written.
-{mapping, "log.crash.file", "lager.crash_log", [
-  {default, "$(platform_log_dir)/crash.log"},
-  {datatype, file}
-]}.
-
-{translation,
- "lager.crash_log",
- fun(Conf) ->
-     case cuttlefish:conf_get("log.crash", Conf) of
-         false -> undefined;
-         _ ->
-             cuttlefish:conf_get("log.crash.file", Conf, "./log/crash.log")
-     end
- end}.
-
-%% @doc Maximum size in bytes of individual messages in the crash log
-{mapping, "log.crash.maximum_message_size", "lager.crash_log_msg_size", [
-  {default, "64KB"},
-  {datatype, bytesize}
-]}.
-
-%% @doc Maximum size of the crash log in bytes, before it is rotated
-{mapping, "log.crash.size", "lager.crash_log_size", [
-  {default, "10MB"},
-  {datatype, bytesize}
-]}.
-
-%% @doc The schedule on which to rotate the crash log.  For more
-%% information see:
-%% https://github.com/basho/lager/blob/master/README.md#internal-log-rotation
-{mapping, "log.crash.rotation", "lager.crash_log_date", [
-  {default, "$D0"}
-]}.
-
-%% @doc The number of rotated crash logs to keep. When set to
-%% 'current', only the current open log file is kept.
-{mapping, "log.crash.rotation.keep", "lager.crash_log_count", [
-  {default, 5},
-  {datatype, [integer, {atom, current}]},
-  {validators, ["rotation_count"]}
-]}.
-
-{validator,
- "rotation_count",
- "must be 'current' or a positive integer",
- fun(current) -> true;
-    (Int) when is_integer(Int) andalso Int >= 0 -> true;
-    (_) -> false
- end}.
-
-{translation,
- "lager.crash_log_count",
- fun(Conf) ->
-    case cuttlefish:conf_get("log.crash.rotation.keep", Conf) of
-       current -> 0;
-       Int -> Int
-    end
- end}.
-
-%% @doc Whether to redirect error_logger messages into lager -
-%% defaults to true
-{mapping, "log.error.redirect", "lager.error_logger_redirect", [
-  {default, on},
-  {datatype, flag},
-  hidden
-]}.
-
-%% @doc Maximum number of error_logger messages to handle in a second
-{mapping, "log.error.messages_per_second", "lager.error_logger_hwm", [
-  {default, 100},
-  {datatype, integer},
-  hidden
-]}.
-
-
-%% @doc Cookie for distributed node communication.  All nodes in the
-%% same cluster should use the same cookie or they will not be able to
-%% communicate.
-{mapping, "distributed_cookie", "vm_args.-setcookie", [
-  {default, "riak"}
-]}.
-
-%% @see platform_bin_dir
 {mapping, "platform_etc_dir", "emqttd.platform_etc_dir", [
   {datatype, directory},
   {default, "./etc"}
 ]}.
 
-%% @see platform_bin_dir
 {mapping, "platform_log_dir", "emqttd.platform_log_dir", [
   {datatype, directory},
   {default, "./log"}
 ]}.
+
+{mapping, "platform_data_dir", "emqttd.platform_data_dir", [
+  {datatype, directory},
+  {default, "./data"}
+]}.
+

+ 2 - 9
src/emqttd.erl

@@ -22,7 +22,7 @@
 
 -include("emqttd_protocol.hrl").
 
--export([start/0, conf/1, conf/2, env/1, env/2, is_running/1]).
+-export([start/0, env/1, env/2, is_running/1]).
 
 %% PubSub API
 -export([subscribe/1, subscribe/2, subscribe/3, publish/1,
@@ -57,15 +57,8 @@
 -spec(start() -> ok | {error, any()}).
 start() -> application:start(?APP).
 
-%% @doc Get Config
--spec(conf(Key :: atom()) -> any()).
-conf(Key) -> emqttd_conf:value(Key).
-
--spec(conf(Key :: atom(), Default :: any()) -> any()).
-conf(Key, Default) -> emqttd_conf:value(Key, Default).
-
 %% @doc Environment
--spec(env(Key:: atom()) -> any()).
+-spec(env(Key:: atom()) -> {ok, any()} | undefined).
 env(Key) -> application:get_env(?APP, Key).
 
 %% @doc Get environment

+ 2 - 2
src/emqttd_access_control.erl

@@ -125,8 +125,8 @@ stop() -> gen_server:call(?MODULE, stop).
 
 init([]) ->
     ets:new(?ACCESS_CONTROL_TAB, [set, named_table, protected, {read_concurrency, true}]),
-    ets:insert(?ACCESS_CONTROL_TAB, {auth_modules, init_mods(gen_conf:list(emqttd, auth))}),
-    ets:insert(?ACCESS_CONTROL_TAB, {acl_modules,  init_mods(gen_conf:list(emqttd, acl))}),
+    %%ets:insert(?ACCESS_CONTROL_TAB, {auth_modules, init_mods(gen_conf:list(emqttd, auth))}),
+    %%ets:insert(?ACCESS_CONTROL_TAB, {acl_modules,  init_mods(gen_conf:list(emqttd, acl))}),
     {ok, #state{}}.
 
 init_mods(Mods) ->

+ 2 - 2
src/emqttd_app.erl

@@ -42,7 +42,6 @@
       Reason    :: term()).
 start(_StartType, _StartArgs) ->
     print_banner(),
-    emqttd_conf:init(),
     emqttd_mnesia:start(),
     {ok, Sup} = emqttd_sup:start_link(),
     start_servers(Sup),
@@ -187,7 +186,8 @@ start_listener({listener, https, ListenOn, Opts}) ->
     mochiweb:start_http(https, ListenOn, Opts, {emqttd_http, handle_request, []}).
 
 start_listener(Protocol, ListenOn, Opts) ->
-    MFArgs = {emqttd_client, start_link, [emqttd_conf:mqtt()]},
+    {ok, Env} = emqttd:env(protocol),
+    MFArgs = {emqttd_client, start_link, [Env]},
     {ok, _} = esockd:open(Protocol, ListenOn, merge_sockopts(Opts), MFArgs).
 
 merge_sockopts(Options) ->

+ 2 - 1
src/emqttd_bridge_sup_sup.erl

@@ -46,7 +46,8 @@ start_bridge(Node, Topic) when is_atom(Node) andalso is_binary(Topic) ->
 start_bridge(Node, _Topic, _Options) when Node =:= node() ->
     {error, bridge_to_self};
 start_bridge(Node, Topic, Options) when is_atom(Node) andalso is_binary(Topic) ->
-    Options1 = emqttd_opts:merge(emqttd_conf:bridge(), Options),
+    {ok, BridgeEnv} = emqttd:env(bridge),
+    Options1 = emqttd_opts:merge(BridgeEnv, Options),
     supervisor:start_child(?MODULE, bridge_spec(Node, Topic, Options1)).
 
 %% @doc Stop a bridge

+ 0 - 112
src/emqttd_conf.erl

@@ -1,112 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
--module(emqttd_conf).
-
--export([init/0]).
-
--export([mqtt/0, session/0, queue/0, bridge/0, pubsub/0]).
-
--export([value/1, value/2, list/1]).
-
--define(APP, emqttd).
-
-init() -> gen_conf:init(?APP).
-
-mqtt() ->
-    with_env(mqtt_protocol, [
-        %% Max ClientId Length Allowed.
-        {max_clientid_len,    value(mqtt_max_clientid_len, 512)},
-        %% Max Packet Size Allowed, 64K by default.
-        {max_packet_size,     value(mqtt_max_packet_size, 65536)},
-        %% Client Idle Timeout.
-        {client_idle_timeout, value(mqtt_client_idle_timeout, 30)}
-    ]).
-
-session() ->
-    with_env(mqtt_session, [
-        %% Max number of QoS 1 and 2 messages that can be “inflight” at one time.
-        %% 0 means no limit
-        {max_inflight,         value(session_max_inflight, 100)},
-
-        %% Retry interval for redelivering QoS1/2 messages.
-        {unack_retry_interval, value(session_unack_retry_interval, 60)},
-
-        %% Awaiting PUBREL Timeout
-        {await_rel_timeout,    value(session_await_rel_timeout, 20)},
-
-        %% Max Packets that Awaiting PUBREL, 0 means no limit
-        {max_awaiting_rel,     value(session_max_awaiting_rel, 0)},
-
-        %% Statistics Collection Interval(seconds)
-        {collect_interval,     value(session_collect_interval, 0)},
-
-        %% Expired after 2 day (unit: minute)
-        {expired_after,        value(session_expired_after, 2880)}
-    ]).
-
-queue() ->
-    with_env(mqtt_queue, [
-        %% Type: simple | priority
-        {type,             value(queue_type, simple)},
-
-        %% Topic Priority: 0~255, Default is 0
-        {priority,         value(queue_priority, [])},
-
-        %% Max queue length. Enqueued messages when persistent client disconnected,
-        %% or inflight window is full.
-        {max_length,       value(queue_max_length, infinity)},
-
-        %% Low-water mark of queued messages
-        {low_watermark,    value(queue_low_watermark, 0.2)},
-
-        %% High-water mark of queued messages
-        {high_watermark,   value(queue_high_watermark, 0.6)},
-
-        %% Queue Qos0 messages?
-        {queue_qos0,       value(queue_qos0, true)}
-    ]).
-
-bridge() ->
-    with_env(mqtt_bridge, [
-        {max_queue_len,      value(bridge_max_queue_len, 10000)},
-
-        %% Ping Interval of bridge node
-        {ping_down_interval, value(bridge_ping_down_interval, 1)}
-    ]).
-
-pubsub() ->
-    with_env(mqtt_pubsub, [
-        %% PubSub and Router. Default should be scheduler numbers.
-        {pool_size, value(pubsub_pool_size, 8)}
-    ]).
-
-value(Key) ->
-    with_env(Key, gen_conf:value(?APP, Key)).
-
-value(Key, Default) ->
-    with_env(Key, gen_conf:value(?APP, Key, Default)).
-
-with_env(Key, Conf) ->
-    case application:get_env(?APP, Key) of
-        undefined ->
-            application:set_env(?APP, Key, Conf), Conf;
-        {ok, Val} ->
-            Val
-    end.
-
-list(Key) -> gen_conf:list(?APP, Key).
-

+ 3 - 3
src/emqttd_pubsub_sup.erl

@@ -32,7 +32,7 @@
 %%--------------------------------------------------------------------
 
 start_link() ->
-    supervisor:start_link({local, ?MODULE}, ?MODULE, [emqttd_conf:pubsub()]).
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
 
 pubsub_pool() ->
     hd([Pid || {pubsub_pool, Pid, _, _} <- supervisor:which_children(?MODULE)]).
@@ -41,10 +41,10 @@ pubsub_pool() ->
 %% Supervisor Callbacks
 %%--------------------------------------------------------------------
 
-init([Env]) ->
+init([]) ->
+    {ok, Env} = emqttd:env(pubsub),
     %% Create ETS Tables
     [create_tab(Tab) || Tab <- [mqtt_subproperty, mqtt_subscriber, mqtt_subscription]],
-
     {ok, { {one_for_all, 10, 3600}, [pool_sup(pubsub, Env), pool_sup(server, Env)]} }.
 
 %%--------------------------------------------------------------------

+ 6 - 5
src/emqttd_session.erl

@@ -214,8 +214,9 @@ unsubscribe(SessPid, TopicTable) ->
 
 init([CleanSess, {ClientId, Username}, ClientPid]) ->
     process_flag(trap_exit, true),
-    true    = link(ClientPid),
-    SessEnv = emqttd_conf:session(),
+    true = link(ClientPid),
+    {ok, QEnv} = emqttd:env(queue),
+    {ok, SessEnv} = emqttd:env(session),
     Session = #session{
             clean_sess        = CleanSess,
             client_id         = ClientId,
@@ -224,14 +225,14 @@ init([CleanSess, {ClientId, Username}, ClientPid]) ->
             subscriptions     = #{},
             inflight_queue    = [],
             max_inflight      = get_value(max_inflight, SessEnv, 0),
-            message_queue     = emqttd_mqueue:new(ClientId, emqttd_conf:queue(), emqttd_alarm:alarm_fun()),
+            message_queue     = emqttd_mqueue:new(ClientId, QEnv, emqttd_alarm:alarm_fun()),
             awaiting_rel      = #{},
             awaiting_ack      = #{},
             awaiting_comp     = #{},
-            retry_interval    = get_value(unack_retry_interval, SessEnv),
+            retry_interval    = get_value(retry_interval, SessEnv),
             await_rel_timeout = get_value(await_rel_timeout, SessEnv),
             max_awaiting_rel  = get_value(max_awaiting_rel, SessEnv),
-            expired_after     = get_value(expired_after, SessEnv) * 60,
+            expired_after     = get_value(expired_after, SessEnv),
             collect_interval  = get_value(collect_interval, SessEnv, 0),
             timestamp         = os:timestamp()},
     emqttd_sm:reg_session(ClientId, CleanSess, sess_info(Session)),

+ 1 - 1
src/emqttd_ws.erl

@@ -31,7 +31,7 @@
 %% @doc Handle WebSocket Request.
 handle_request(Req) ->
     Peer = Req:get(peer),
-    PktOpts = emqttd_conf:mqtt(),
+    {ok, PktOpts} = emqttd:env(protocol),
     ParserFun = emqttd_parser:new(PktOpts),
     {ReentryWs, ReplyChannel} = upgrade(Req),
     {ok, ClientPid} = emqttd_ws_client_sup:start_client(self(), Req, ReplyChannel),

+ 3 - 3
src/emqttd_ws_client_sup.erl

@@ -27,7 +27,7 @@
 %% @doc Start websocket client supervisor
 -spec(start_link() -> {ok, pid()}).
 start_link() ->
-    supervisor:start_link({local, ?MODULE}, ?MODULE, [emqttd_conf:mqtt()]).
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
 
 %% @doc Start a WebSocket Client
 -spec(start_client(pid(), mochiweb_request:request(), fun()) -> {ok, pid()}).
@@ -37,8 +37,8 @@ start_client(WsPid, Req, ReplyChannel) ->
 %%--------------------------------------------------------------------
 %% Supervisor callbacks
 %%--------------------------------------------------------------------
-
-init([Env]) ->
+init([]) ->
+    {ok, Env} = emqttd:env(protocol),
     {ok, {{simple_one_for_one, 0, 1},
            [{ws_client, {emqttd_ws_client, start_link, [Env]},
              temporary, 5000, worker, [emqttd_ws_client]}]}}.