Quellcode durchsuchen

Merge pull request #943 from emqtt/emq20

Version 2.1.0-beta.2 updates
Feng Lee vor 9 Jahren
Ursprung
Commit
dbbe499769

+ 3 - 4
.travis.yml

@@ -1,10 +1,9 @@
 language: erlang
 
 otp_release:
-   - 18.0
-   - 18.1
-   - 18.2.1
-   - 18.3
+   - 19.0
+   - 19.1
+   - 19.2
 
 script: 
   - make

+ 7 - 7
Makefile

@@ -6,13 +6,13 @@ NO_AUTOPATCH = cuttlefish
 
 DEPS = gproc lager esockd mochiweb lager_syslog pbkdf2
 
-dep_gproc       = git https://github.com/uwiger/gproc
-dep_getopt      = git https://github.com/jcomellas/getopt v0.8.2
-dep_lager       = git https://github.com/basho/lager master
-dep_esockd      = git https://github.com/emqtt/esockd master
-dep_mochiweb    = git https://github.com/emqtt/mochiweb
-dep_lager_syslog  = git https://github.com/basho/lager_syslog
-dep_pbkdf2 	 = git https://github.com/comtihon/erlang-pbkdf2.git 2.0.0
+dep_gproc        = git https://github.com/uwiger/gproc
+dep_getopt       = git https://github.com/jcomellas/getopt v0.8.2
+dep_lager        = git https://github.com/basho/lager master
+dep_esockd       = git https://github.com/emqtt/esockd emq20
+dep_mochiweb     = git https://github.com/emqtt/mochiweb
+dep_lager_syslog = git https://github.com/basho/lager_syslog
+dep_pbkdf2       = git https://github.com/comtihon/erlang-pbkdf2.git 2.0.0
 
 ERLC_OPTS += +'{parse_transform, lager_transform}'
 

+ 3 - 4
include/emqttd_internal.hrl

@@ -16,19 +16,18 @@
 
 %% Internal Header File
 
--define(GPROC_POOL(JoinOrLeave, Pool, I),
+-define(GPROC_POOL(JoinOrLeave, Pool, Id),
         (begin
             case JoinOrLeave of
                 join  -> gproc_pool:connect_worker(Pool, {Pool, Id});
-                leave -> gproc_pool:disconnect_worker(Pool, {Pool, I})
+                leave -> gproc_pool:disconnect_worker(Pool, {Pool, Id})
             end
         end)).
 
 -define(PROC_NAME(M, I), (list_to_atom(lists:concat([M, "_", I])))).
 
 -define(record_to_proplist(Def, Rec),
-        lists:zip(record_info(fields, Def),
-                  tl(tuple_to_list(Rec)))).
+        lists:zip(record_info(fields, Def), tl(tuple_to_list(Rec)))).
 
 -define(record_to_proplist(Def, Rec, Fields),
     [{K, V} || {K, V} <- ?record_to_proplist(Def, Rec),

+ 1 - 1
priv/emq.schema

@@ -725,7 +725,7 @@ end}.
                             ConnOpts = Filter([{rate_limit, cuttlefish:conf_get(Key ++ ".rate_limit", Conf, undefined)}]),
                             Opts = [{connopts, ConnOpts}, {sockopts, TcpOpts(Key)} | LisOpts(Key)],
                             [{Name, Port, case Name =:= ssl orelse Name =:= https of
-                                              true  -> [{ssl, SslOpts(Key)} | Opts];
+                                              true  -> [{sslopts, SslOpts(Key)} | Opts];
                                               false -> Opts
                                           end}]
                    end

+ 1 - 1
rebar.config

@@ -1,4 +1,4 @@
 {deps, [
-{gproc,".*",{git,"https://github.com/uwiger/gproc",""}},{lager,".*",{git,"https://github.com/basho/lager","master"}},{esockd,".*",{git,"https://github.com/emqtt/esockd","master"}},{mochiweb,".*",{git,"https://github.com/emqtt/mochiweb",""}},{lager_syslog,".*",{git,"https://github.com/basho/lager_syslog",""}}
+{gproc,".*",{git,"https://github.com/uwiger/gproc",""}},{lager,".*",{git,"https://github.com/basho/lager","master"}},{esockd,".*",{git,"https://github.com/emqtt/esockd","emq20"}},{mochiweb,".*",{git,"https://github.com/emqtt/mochiweb",""}},{lager_syslog,".*",{git,"https://github.com/basho/lager_syslog",""}},{pbkdf2,".*",{git,"https://github.com/comtihon/erlang-pbkdf2.git","2.0.0"}}
 ]}.
 {erl_opts, [{parse_transform,lager_transform}]}.

+ 10 - 6
src/emqttd.app.src

@@ -1,8 +1,12 @@
 {application, emqttd, [
-	{description, "Erlang MQTT Broker"},
-	{vsn, "2.1.0"},
-	{modules, []},
-	{registered, [emqttd_sup]},
-	{applications, [kernel,stdlib,gproc,lager,esockd,mochiweb,lager_syslog]},
-	{mod, {emqttd_app, []}}
+    {description, "Erlang MQTT Broker"},
+    {vsn, "2.1.0"},
+    {modules, []},
+    {registered, [emqttd_sup]},
+    {applications, [kernel,stdlib,gproc,lager,esockd,mochiweb,lager_syslog,pbkdf2]},
+    {env, []},
+    {mod, {emqttd_app, []}},
+    {maintainers, ["Feng Lee <feng@emqtt.io>"]},
+    {licenses, ["MIT"]},
+    {links, [{"Github", "https://github.com/emqtt/emqttd"}]}
 ]}.

+ 12 - 5
src/emqttd_client.erl

@@ -127,7 +127,7 @@ do_init(Conn, Env, Peername) ->
                                      force_gc_count = ForceGcCount}),
     IdleTimout = get_value(client_idle_timeout, Env, 30000),
     gen_server2:enter_loop(?MODULE, [], State, self(), IdleTimout,
-                           {backoff, 1000, 1000, 10000}).
+                           {backoff, 2000, 2000, 20000}).
 
 send_fun(Conn, Peername) ->
     Self = self(),
@@ -252,8 +252,13 @@ handle_info({keepalive, start, Interval}, State = #client_state{connection = Con
                     {error, Error}              -> {error, Error}
                 end
              end,
-    KeepAlive = emqttd_keepalive:start(StatFun, Interval, {keepalive, check}),
-    {noreply, State#client_state{keepalive = KeepAlive}, hibernate};
+    case emqttd_keepalive:start(StatFun, Interval, {keepalive, check}) of
+        {ok, KeepAlive} ->
+            {noreply, State#client_state{keepalive = KeepAlive}, hibernate};
+        {error, Error} ->
+            ?LOG(warning, "Keepalive error - ~p", [Error], State),
+            shutdown(Error, State)
+    end;
 
 handle_info({keepalive, check}, State = #client_state{keepalive = KeepAlive}) ->
     case emqttd_keepalive:check(KeepAlive) of
@@ -372,5 +377,7 @@ shutdown(Reason, State) ->
 stop(Reason, State) ->
     {stop, Reason, State}.
 
-gc(State) ->
-    emqttd_gc:maybe_force_gc(#client_state.force_gc_count, State).
+gc(State = #client_state{connection = Conn}) ->
+    Cb = fun() -> Conn:gc() end,
+    emqttd_gc:maybe_force_gc(#client_state.force_gc_count, State, Cb).
+

+ 5 - 2
src/emqttd_gc.erl

@@ -20,7 +20,8 @@
 
 -author("Feng Lee <feng@emqtt.io>").
 
--export([conn_max_gc_count/0, reset_conn_gc_count/2, maybe_force_gc/2]).
+-export([conn_max_gc_count/0, reset_conn_gc_count/2, maybe_force_gc/2,
+         maybe_force_gc/3]).
 
 -spec(conn_max_gc_count() -> integer()).
 conn_max_gc_count() ->
@@ -38,9 +39,11 @@ reset_conn_gc_count(Pos, State) ->
     end.
 
 maybe_force_gc(Pos, State) ->
+    maybe_force_gc(Pos, State, fun() -> ok end).
+maybe_force_gc(Pos, State, Cb) ->
     case element(Pos, State) of
         undefined     -> State;
-        I when I =< 0 -> garbage_collect(),
+        I when I =< 0 -> Cb(), garbage_collect(),
                          reset_conn_gc_count(Pos, State);
         I             -> setelement(Pos, State, I - 1)
     end.

+ 14 - 12
src/emqttd_keepalive.erl

@@ -29,14 +29,18 @@
 -export_type([keepalive/0]).
 
 %% @doc Start a keepalive
--spec(start(fun(), integer(), any()) -> undefined | keepalive()).
+-spec(start(fun(), integer(), any()) -> {ok, keepalive()} | {error, any()}).
 start(_, 0, _) ->
-    undefined;
+    {ok, #keepalive{}};
 start(StatFun, TimeoutSec, TimeoutMsg) ->
-    {ok, StatVal} = StatFun(),
-    #keepalive{statfun = StatFun, statval = StatVal,
-               tsec = TimeoutSec, tmsg = TimeoutMsg,
-               tref = timer(TimeoutSec, TimeoutMsg)}.
+    case StatFun() of
+        {ok, StatVal} ->
+            {ok, #keepalive{statfun = StatFun, statval = StatVal,
+                            tsec = TimeoutSec, tmsg = TimeoutMsg,
+                            tref = timer(TimeoutSec, TimeoutMsg)}};
+        {error, Error} ->
+            {error, Error}
+    end.
 
 %% @doc Check keepalive, called when timeout.
 -spec(check(keepalive()) -> {ok, keepalive()} | {error, any()}).
@@ -59,12 +63,10 @@ resume(KeepAlive = #keepalive{tsec = TimeoutSec, tmsg = TimeoutMsg}) ->
 
 %% @doc Cancel Keepalive
 -spec(cancel(keepalive()) -> ok).
-cancel(#keepalive{tref = TRef}) ->
-    cancel(TRef);
-cancel(undefined) -> 
-    ok;
-cancel(TRef) ->
-    catch erlang:cancel_timer(TRef).
+cancel(#keepalive{tref = TRef}) when is_reference(TRef) ->
+    catch erlang:cancel_timer(TRef), ok;
+cancel(_) ->
+    ok.
 
 timer(Sec, Msg) ->
     erlang:send_after(timer:seconds(Sec), self(), Msg).

+ 8 - 3
src/emqttd_ws_client.erl

@@ -104,7 +104,7 @@ init([Env, WsPid, Req, ReplyChannel]) ->
                          proto_state    = ProtoState,
                          enable_stats   = EnableStats,
                          force_gc_count = ForceGcCount},
-     IdleTimeout, {backoff, 1000, 1000, 10000}, ?MODULE}.
+     IdleTimeout, {backoff, 2000, 2000, 20000}, ?MODULE}.
 
 prioritise_call(Msg, _From, _Len, _State) ->
     case Msg of info -> 10; stats -> 10; state -> 10; _ -> 5 end.
@@ -198,8 +198,13 @@ handle_info({shutdown, conflict, {ClientId, NewPid}}, State) ->
 
 handle_info({keepalive, start, Interval}, State = #wsclient_state{connection = Conn}) ->
     ?WSLOG(debug, "Keepalive at the interval of ~p", [Interval], State),
-    KeepAlive = emqttd_keepalive:start(stat_fun(Conn), Interval, {keepalive, check}),
-    {noreply, State#wsclient_state{keepalive = KeepAlive}, hibernate};
+    case emqttd_keepalive:start(stat_fun(Conn), Interval, {keepalive, check}) of
+        {ok, KeepAlive} ->
+            {noreply, State#wsclient_state{keepalive = KeepAlive}, hibernate};
+        {error, Error} ->
+            ?WSLOG(warning, "Keepalive error - ~p", [Error], State),
+            shutdown(Error, State)
+    end;
 
 handle_info({keepalive, check}, State = #wsclient_state{keepalive = KeepAlive}) ->
     case emqttd_keepalive:check(KeepAlive) of

+ 60 - 2
test/emqttd_SUITE.erl

@@ -47,7 +47,8 @@ all() ->
      {group, http},
      {group, cluster},
      {group, alarms},
-     {group, cli}].
+     {group, cli},
+     {group, cleanSession}].
 
 groups() ->
     [{protocol, [sequence],
@@ -103,7 +104,11 @@ groups() ->
        cli_bridges,
        cli_plugins,
        cli_listeners,
-       cli_vm]}].
+       cli_vm]},
+    {cleanSession, [sequence],
+      [cleanSession_validate,
+       cleanSession_validate1,
+       cleanSession_validate2]}].
 
 init_per_suite(Config) ->
     application:start(lager),
@@ -618,6 +623,59 @@ cli_vm(_) ->
     emqttd_cli:vm([]),
     emqttd_cli:vm(["ports"]).
 
+cleanSession_validate(_) ->
+    {ok, C1} = emqttc:start_link([{host, "localhost"},
+                                         {port, 1883},
+                                         {client_id, <<"c1">>},
+                                         {clean_sess, false}]),
+    timer:sleep(10),
+    emqttc:subscribe(C1, <<"topic">>, qos0),
+    ok = emqttd_cli:sessions(["list", "persistent"]),
+    emqttc:disconnect(C1),
+    {ok, Pub} = emqttc:start_link([{host, "localhost"},
+                                         {port, 1883},
+                                         {client_id, <<"pub">>}]),
+
+    emqttc:publish(Pub, <<"topic">>, <<"m1">>, [{qos, 0}]),
+    timer:sleep(10),
+    {ok, C11} = emqttc:start_link([{host, "localhost"},
+                                         {port, 1883},
+                                         {client_id, <<"c1">>},
+                                         {clean_sess, false}]),
+    timer:sleep(100),
+    Metrics = emqttd_metrics:all(),
+    ct:log("Metrics:~p~n", [Metrics]),
+    ?assertEqual(1, proplists:get_value('messages/qos0/sent', Metrics)),
+    ?assertEqual(1, proplists:get_value('messages/qos0/received', Metrics)),
+    emqttc:disconnect(Pub),
+    emqttc:disconnect(C11).
+
+cleanSession_validate1(_) ->
+    {ok, C1} = emqttc:start_link([{host, "localhost"},
+                                         {port, 1883},
+                                         {client_id, <<"c1">>},
+                                         {clean_sess, true}]),
+    timer:sleep(10),
+    emqttc:subscribe(C1, <<"topic">>, qos1),
+    ok = emqttd_cli:sessions(["list", "transient"]),
+    emqttc:disconnect(C1),
+    {ok, Pub} = emqttc:start_link([{host, "localhost"},
+                                         {port, 1883},
+                                         {client_id, <<"pub">>}]),
+
+    emqttc:publish(Pub, <<"topic">>, <<"m1">>, [{qos, 1}]),
+    timer:sleep(10),
+    {ok, C11} = emqttc:start_link([{host, "localhost"},
+                                         {port, 1883},
+                                         {client_id, <<"c1">>},
+                                         {clean_sess, false}]),
+    timer:sleep(100),
+    Metrics = emqttd_metrics:all(),
+    ?assertEqual(0, proplists:get_value('messages/qos1/sent', Metrics)),
+    ?assertEqual(1, proplists:get_value('messages/qos1/received', Metrics)),
+    emqttc:disconnect(Pub),
+    emqttc:disconnect(C11).
+
 
 ensure_ok(ok) -> ok;
 ensure_ok({error, {already_started, _}}) -> ok.

+ 68 - 30
test/emqttd_SUITE_data/emqttd.conf

@@ -1,3 +1,8 @@
+
+##===================================================================
+## EMQ Configuration R2.1
+##===================================================================
+
 ##--------------------------------------------------------------------
 ## Node Args
 ##--------------------------------------------------------------------
@@ -11,6 +16,11 @@ node.cookie = emq_dist_cookie
 ## SMP support: enable, auto, disable
 node.smp = auto
 
+## vm.args: -heart
+## Heartbeat monitoring of an Erlang runtime system
+## Value should be 'on' or comment the line
+## node.heartbeat = on
+
 ## Enable kernel poll
 node.kernel_poll = on
 
@@ -40,21 +50,30 @@ node.crash_dump = log/crash.dump
 node.dist_net_ticktime = 60
 
 ## Distributed node port range
-## node.dist_listen_min = 6000
-## node.dist_listen_max = 6999
+## node.dist_listen_min = 6369
+## node.dist_listen_max = 6369
 
 ##--------------------------------------------------------------------
 ## Log
 ##--------------------------------------------------------------------
 
+## Set the log dir
+log.dir = {{ platform_log_dir }}
+
 ## Console log. Enum: off, file, console, both
 log.console = console
 
+## Syslog. Enum: on, off
+log.syslog = on
+
+##  syslog level. Enum: debug, info, notice, warning, error, critical, alert, emergency
+log.syslog.level = error
+
 ## Console log level. Enum: debug, info, notice, warning, error, critical, alert, emergency
 log.console.level = error
 
 ## Console log file
-## log.console.file = log/console.log
+## log.console.file = {{ platform_log_dir }}/console.log
 
 ## Error log file
 log.error.file = log/error.log
@@ -64,6 +83,19 @@ log.crash = on
 
 log.crash.file = log/crash.log
 
+##--------------------------------------------------------------------
+## Allow Anonymous and Default ACL
+##--------------------------------------------------------------------
+
+## Allow Anonymous authentication
+mqtt.allow_anonymous = true
+
+## Default ACL File
+mqtt.acl_file = etc/acl.conf
+
+## Cache ACL for PUBLISH
+mqtt.cache_acl = true
+
 ##--------------------------------------------------------------------
 ## MQTT Protocol
 ##--------------------------------------------------------------------
@@ -74,34 +106,38 @@ mqtt.max_clientid_len = 1024
 ## Max Packet Size Allowed, 64K by default.
 mqtt.max_packet_size = 64KB
 
+##--------------------------------------------------------------------
+## MQTT Client
+##--------------------------------------------------------------------
+
 ## Client Idle Timeout (Second)
-mqtt.client_idle_timeout = 30
+mqtt.client.idle_timeout = 30s
 
-## Allow Anonymous authentication
-mqtt.allow_anonymous = true
-
-## Default ACL File
-mqtt.acl_file = etc/acl.conf
+## Enable client Stats: seconds or off
+mqtt.client.enable_stats = off
 
 ##--------------------------------------------------------------------
 ## MQTT Session
 ##--------------------------------------------------------------------
 
+## Upgrade QoS?
+mqtt.session.upgrade_qos = off
+
 ## Max number of QoS 1 and 2 messages that can be “inflight” at one time.
 ## 0 means no limit
-mqtt.session.max_inflight = 100
+mqtt.session.max_inflight = 32
 
-## Retry interval for redelivering QoS1/2 messages.
-mqtt.session.retry_interval = 60
-
-## Awaiting PUBREL Timeout
-mqtt.session.await_rel_timeout = 20
+## Retry Interval for redelivering QoS1/2 messages.
+mqtt.session.retry_interval = 20s
 
 ## Max Packets that Awaiting PUBREL, 0 means no limit
-mqtt.session.max_awaiting_rel = 0
+mqtt.session.max_awaiting_rel = 100
 
-## Statistics Collection Interval(seconds)
-mqtt.session.collect_interval = 0
+## Awaiting PUBREL Timeout
+mqtt.session.await_rel_timeout = 20s
+
+## Enable Statistics at the Interval(seconds)
+mqtt.session.enable_stats = off
 
 ## Expired after 1 day:
 ## w - week
@@ -109,7 +145,7 @@ mqtt.session.collect_interval = 0
 ## h - hour
 ## m - minute
 ## s - second
-mqtt.session.expired_after = 1d
+mqtt.session.expiry_interval = 2h
 
 ##--------------------------------------------------------------------
 ## MQTT Queue
@@ -204,12 +240,13 @@ mqtt.listener.ssl.max_clients = 512
 ## Rate Limit. Format is 'burst,rate', Unit is KB/Sec
 ## mqtt.listener.ssl.rate_limit = 100,10
 
-## Configuring SSL Options
-## See http://erlang.org/doc/man/ssl.html
-mqtt.listener.ssl.handshake_timeout = 15
+## Configuring SSL Options. See http://erlang.org/doc/man/ssl.html
+### TLS only for POODLE attack
+mqtt.listener.ssl.tls_versions = tlsv1.2,tlsv1.1,tlsv1
+mqtt.listener.ssl.handshake_timeout = 15s
 mqtt.listener.ssl.keyfile = certs/key.pem
 mqtt.listener.ssl.certfile = certs/cert.pem
-## mqtt.listener.ssl.cacertfile = etc/certs/cacert.pem
+## mqtt.listener.ssl.cacertfile = {{ platform_etc_dir }}/certs/cacert.pem
 ## mqtt.listener.ssl.verify = verify_peer
 ## mqtt.listener.ssl.fail_if_no_peer_cert = true
 
@@ -219,13 +256,14 @@ mqtt.listener.http.acceptors = 4
 mqtt.listener.http.max_clients = 64
 
 ## HTTP(SSL) Listener
-## mqtt.listener.https = 8084
-## mqtt.listener.https.acceptors = 4
-## mqtt.listener.https.max_clients = 64
-## mqtt.listener.https.handshake_timeout = 15
-## mqtt.listener.https.certfile = etc/certs/cert.pem
-## mqtt.listener.https.keyfile = etc/certs/key.pem
-## mqtt.listener.https.cacertfile = etc/certs/cacert.pem
+mqtt.listener.https = 8084
+mqtt.listener.https.acceptors = 4
+mqtt.listener.https.max_clients = 64
+mqtt.listener.https.handshake_timeout = 15
+mqtt.listener.https.keyfile = certs/key.pem
+mqtt.listener.https.certfile = certs/cert.pem
+## mqtt.listener.https.cacertfile = {{ platform_etc_dir }}/certs/cacert.pem
+
 ## mqtt.listener.https.verify = verify_peer
 ## mqtt.listener.https.fail_if_no_peer_cert = true
 

+ 145 - 63
test/emqttd_SUITE_data/emqttd.schema

@@ -22,6 +22,19 @@
   hidden
 ]}.
 
+%% @doc http://erlang.org/doc/man/heart.html
+{mapping, "node.heartbeat", "vm_args.-heart", [
+  {datatype, flag},
+  hidden
+]}.
+
+{translation, "vm_args.-heart", fun(Conf) ->
+    case cuttlefish:conf_get("node.heartbeat", Conf) of
+        true  -> "";
+        false -> cuttlefish:invalid("should be 'on' or comment the line!")
+    end
+end}.
+
 %% @doc Enable Kernel Poll
 {mapping, "node.kernel_poll", "vm_args.+K", [
   {default, on},
@@ -135,8 +148,13 @@
 %% Log
 %%--------------------------------------------------------------------
 
+{mapping, "log.dir", "lager.log_dir", [
+  {default, "log"},
+  {datatype, string}
+]}.
+
 {mapping, "log.console", "lager.handlers", [
-  {default, file },
+  {default, file},
   {datatype, {enum, [off, file, console, both]}}
 ]}.
 
@@ -155,6 +173,26 @@
   {datatype, file}
 ]}.
 
+{mapping, "log.syslog", "lager.handlers", [
+  {default,  off},
+  {datatype, flag}
+]}.
+
+{mapping, "log.syslog.identity", "lager.handlers", [
+  {default, "emq"},
+  {datatype, string}
+]}.
+
+{mapping, "log.syslog.facility", "lager.handlers", [
+  {default, local0},
+  {datatype, {enum, [daemon, local0, local1, local2, local3, local4, local5, local6, local7]}}
+]}.
+
+{mapping, "log.syslog.level", "lager.handlers", [
+  {default, err},
+  {datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency]}}
+]}.
+
 {mapping, "log.error.redirect", "lager.error_logger_redirect", [
   {default, on},
   {datatype, flag},
@@ -196,7 +234,16 @@
       both -> [ConsoleHandler, ConsoleFileHandler];
       _ -> []
     end,
-    ConsoleHandlers ++ ErrorHandler
+
+    SyslogHandler = case cuttlefish:conf_get("log.syslog", Conf) of
+      false -> [];
+      true  -> [{lager_syslog_backend,
+                  [cuttlefish:conf_get("log.syslog.identity", Conf),
+                   cuttlefish:conf_get("log.syslog.facility", Conf),
+                   cuttlefish:conf_get("log.syslog.level", Conf)]}]
+    end,
+
+    ConsoleHandlers ++ ErrorHandler ++ SyslogHandler
   end
 }.
 
@@ -226,6 +273,28 @@
   hidden
 ]}.
 
+%%--------------------------------------------------------------------
+%% Allow Anonymous and Default ACL
+%%--------------------------------------------------------------------
+
+%% @doc Allow Anonymous
+{mapping, "mqtt.allow_anonymous", "emqttd.allow_anonymous", [
+  {default, false},
+  {datatype, {enum, [true, false]}}
+]}.
+
+%% @doc Default ACL File
+{mapping, "mqtt.acl_file", "emqttd.acl_file", [
+  {datatype, string},
+  hidden
+]}.
+
+%% @doc Cache ACL for PUBLISH
+{mapping, "mqtt.cache_acl", "emqttd.cache_acl", [
+  {default, true},
+  {datatype, {enum, [true, false]}}
+]}.
+
 %%--------------------------------------------------------------------
 %% MQTT Protocol
 %%--------------------------------------------------------------------
@@ -242,35 +311,42 @@
   {datatype, bytesize}
 ]}.
 
-%% @doc Client Idle Timeout.
-{mapping, "mqtt.client_idle_timeout", "emqttd.protocol", [
-  {default, 30},
-  {datatype, integer}
-]}.
-
 {translation, "emqttd.protocol", fun(Conf) ->
-  [{max_clientid_len,    cuttlefish:conf_get("mqtt.max_clientid_len", Conf)},
-   {max_packet_size,     cuttlefish:conf_get("mqtt.max_packet_size", Conf)},
-   {client_idle_timeout, cuttlefish:conf_get("mqtt.client_idle_timeout", Conf)}]
+  [{max_clientid_len, cuttlefish:conf_get("mqtt.max_clientid_len", Conf)},
+   {max_packet_size,  cuttlefish:conf_get("mqtt.max_packet_size", Conf)}]
 end}.
 
-%% @doc Allow Anonymous
-{mapping, "mqtt.allow_anonymous", "emqttd.allow_anonymous", [
-  {default, false},
-  {datatype, {enum, [true, false]}},
-  hidden
+%%--------------------------------------------------------------------
+%% MQTT Client
+%%--------------------------------------------------------------------
+
+%% @doc Client Idle Timeout.
+{mapping, "mqtt.client.idle_timeout", "emqttd.client", [
+  {default, "30s"},
+  {datatype, {duration, ms}}
 ]}.
 
-%% @doc Default ACL File
-{mapping, "mqtt.acl_file", "emqttd.acl_file", [
-  {datatype, string},
-  hidden
+%% @doc Enable Stats of Client.
+{mapping, "mqtt.client.enable_stats", "emqttd.client", [
+  {default, off},
+  {datatype, [{duration, ms}, flag]}
 ]}.
 
+%% @doc Client
+{translation, "emqttd.client", fun(Conf) ->
+  [{client_idle_timeout, cuttlefish:conf_get("mqtt.client.idle_timeout", Conf)},
+   {client_enable_stats, cuttlefish:conf_get("mqtt.client.enable_stats", Conf)}]
+end}.
+
 %%--------------------------------------------------------------------
 %% MQTT Session
 %%--------------------------------------------------------------------
 
+%% @doc Upgrade QoS?
+{mapping, "mqtt.session.upgrade_qos", "emqttd.session", [
+  {default, off},
+  {datatype, flag}
+]}.
 %% @doc Max number of QoS 1 and 2 messages that can be “inflight” at one time.
 %% 0 means no limit
 {mapping, "mqtt.session.max_inflight", "emqttd.session", [
@@ -278,17 +354,10 @@ end}.
   {datatype, integer}
 ]}.
 
-
 %% @doc Retry interval for redelivering QoS1/2 messages.
 {mapping, "mqtt.session.retry_interval", "emqttd.session", [
-  {default, 60},
-  {datatype, integer}
-]}.
-
-%% @doc Awaiting PUBREL Timeout
-{mapping, "mqtt.session.await_rel_timeout", "emqttd.session", [
-  {default, 30},
-  {datatype, integer}
+  {default, "20s"},
+  {datatype, {duration, ms}}
 ]}.
 
 %% @doc Max Packets that Awaiting PUBREL, 0 means no limit
@@ -297,25 +366,32 @@ end}.
   {datatype, integer}
 ]}.
 
-%% @doc Statistics Collection Interval(seconds)
-{mapping, "mqtt.session.collect_interval", "emqttd.session", [
-  {default, 0},
-  {datatype, integer}
+%% @doc Awaiting PUBREL Timeout
+{mapping, "mqtt.session.await_rel_timeout", "emqttd.session", [
+  {default, "20s"},
+  {datatype, {duration, ms}}
+]}.
+
+%% @doc Enable Stats
+{mapping, "mqtt.session.enable_stats", "emqttd.session", [
+  {default, off},
+  {datatype, [{duration, ms}, flag]}
 ]}.
 
-%% @doc Session expired after...
-{mapping, "mqtt.session.expired_after", "emqttd.session", [
-  {default, "2d"},
-  {datatype, {duration, s}}
+%% @doc Session Expiry Interval
+{mapping, "mqtt.session.expiry_interval", "emqttd.session", [
+  {default, "2h"},
+  {datatype, {duration, ms}}
 ]}.
 
 {translation, "emqttd.session", fun(Conf) ->
-  [{max_inflight, cuttlefish:conf_get("mqtt.session.max_inflight", Conf)},
-   {retry_interval, cuttlefish:conf_get("mqtt.session.retry_interval", Conf)},
+  [{upgrade_qos,       cuttlefish:conf_get("mqtt.session.upgrade_qos", Conf)},
+   {max_inflight,      cuttlefish:conf_get("mqtt.session.max_inflight", Conf)},
+   {retry_interval,    cuttlefish:conf_get("mqtt.session.retry_interval", Conf)},
+   {max_awaiting_rel,  cuttlefish:conf_get("mqtt.session.max_awaiting_rel", Conf)},
    {await_rel_timeout, cuttlefish:conf_get("mqtt.session.await_rel_timeout", Conf)},
-   {max_awaiting_rel, cuttlefish:conf_get("mqtt.session.max_awaiting_rel", Conf)},
-   {collect_interval, cuttlefish:conf_get("mqtt.session.collect_interval", Conf)},
-   {expired_after, cuttlefish:conf_get("mqtt.session.expired_after", Conf)}]
+   {enable_stats,      cuttlefish:conf_get("mqtt.session.enable_stats", Conf)},
+   {expiry_interval,   cuttlefish:conf_get("mqtt.session.expiry_interval", Conf)}]
 end}.
 
 %%--------------------------------------------------------------------
@@ -331,28 +407,25 @@ end}.
 %% @doc Topic Priority: 0~255, Default is 0
 {mapping, "mqtt.queue.priority", "emqttd.queue", [
   {default, ""},
-  {datatype, string},
-  hidden
+  {datatype, string}
 ]}.
 
 %% @doc Max queue length. Enqueued messages when persistent client disconnected, or inflight window is full.
 {mapping, "mqtt.queue.max_length", "emqttd.queue", [
   {default, infinity},
-  {datatype, [atom, integer]}
+  {datatype, [integer, {atom, infinity}]}
 ]}.
 
 %% @doc Low-water mark of queued messages
 {mapping, "mqtt.queue.low_watermark", "emqttd.queue", [
   {default, "20%"},
-  {datatype, string},
-  hidden
+  {datatype, string}
 ]}.
 
 %% @doc High-water mark of queued messages
 {mapping, "mqtt.queue.high_watermark", "emqttd.queue", [
   {default, "60%"},
-  {datatype, string},
-  hidden
+  {datatype, string}
 ]}.
 
 %% @doc Queue Qos0 messages?
@@ -405,8 +478,7 @@ end}.
 
 {mapping, "mqtt.pubsub.async", "emqttd.pubsub", [
   {default, true},
-  {datatype, {enum, [true, false]}},
-  hidden
+  {datatype, {enum, [true, false]}}
 ]}.
 
 {translation, "emqttd.pubsub", fun(Conf) ->
@@ -451,7 +523,7 @@ end}.
 %%--------------------------------------------------------------------
 
 {mapping, "mqtt.listener.tcp", "emqttd.listeners", [
-  {default, 1883},
+  %% {default, 1883},
   {datatype, [integer, ip]}
 ]}.
 
@@ -467,8 +539,7 @@ end}.
 
 {mapping, "mqtt.listener.tcp.rate_limit", "emqttd.listeners", [
   {default, undefined},
-  {datatype, string},
-  hidden
+  {datatype, string}
 ]}.
 
 {mapping, "mqtt.listener.tcp.backlog", "emqttd.listeners", [
@@ -497,7 +568,7 @@ end}.
 ]}.
 
 {mapping, "mqtt.listener.ssl", "emqttd.listeners", [
-  {default, 8883},
+  %% {default, 8883},
   {datatype, [integer, ip]}
 ]}.
 
@@ -515,9 +586,13 @@ end}.
   {datatype, string}
 ]}.
 
+{mapping, "mqtt.listener.ssl.tls_versions", "emqttd.listeners", [
+  {datatype, string}
+]}.
+
 {mapping, "mqtt.listener.ssl.handshake_timeout", "emqttd.listeners", [
-  {default, 15},
-  {datatype, integer}
+  {default, "15s"},
+  {datatype, {duration, ms}}
 ]}.
 
 {mapping, "mqtt.listener.ssl.keyfile", "emqttd.listeners", [
@@ -541,7 +616,7 @@ end}.
 ]}.
 
 {mapping, "mqtt.listener.http", "emqttd.listeners", [
-  {default, 8883},
+  %% {default, 8083},
   {datatype, [integer, ip]}
 ]}.
 
@@ -556,9 +631,8 @@ end}.
 ]}.
 
 {mapping, "mqtt.listener.https", "emqttd.listeners", [
-  {default, undefined},
-  {datatype, [integer, ip]},
-  hidden
+  %%{default, 8084},
+  {datatype, [integer, ip]}
 ]}.
 
 {mapping, "mqtt.listener.https.acceptors", "emqttd.listeners", [
@@ -610,8 +684,16 @@ end}.
                            {buffer,  cuttlefish:conf_get(Prefix ++ ".buffer", Conf, undefined)},
                            {nodelay, cuttlefish:conf_get(Prefix ++ ".nodelay", Conf, true)}])
               end,
+
+    SplitFun = fun(undefined) -> undefined; (S) -> string:tokens(S, ",") end,
+
     SslOpts = fun(Prefix) ->
-                  Filter([{handshake_timeout, cuttlefish:conf_get(Prefix ++ ".handshake_timeout", Conf) * 1000},
+                 Versions = case SplitFun(cuttlefish:conf_get(Prefix ++ ".tls_versions", Conf, undefined)) of
+                                undefined -> undefined;
+                                L -> [list_to_atom(V) || V <- L]
+                            end,
+                  Filter([{versions, Versions},
+                          {handshake_timeout, cuttlefish:conf_get(Prefix ++ ".handshake_timeout", Conf), undefined},
                           {keyfile,    cuttlefish:conf_get(Prefix ++ ".keyfile", Conf, undefined)},
                           {certfile,   cuttlefish:conf_get(Prefix ++ ".certfile", Conf, undefined)},
                           {cacertfile, cuttlefish:conf_get(Prefix ++ ".cacertfile", Conf, undefined)},
@@ -628,7 +710,7 @@ end}.
                             ConnOpts = Filter([{rate_limit, cuttlefish:conf_get(Key ++ ".rate_limit", Conf, undefined)}]),
                             Opts = [{connopts, ConnOpts}, {sockopts, TcpOpts(Key)} | LisOpts(Key)],
                             [{Name, Port, case Name =:= ssl orelse Name =:= https of
-                                              true  -> [{ssl, SslOpts(Key)} | Opts];
+                                              true  -> [{sslopts, SslOpts(Key)} | Opts];
                                               false -> Opts
                                           end}]
                    end

+ 1 - 1
test/emqttd_net_SUITE.erl

@@ -28,7 +28,7 @@ groups() -> [{keepalive, [], [t_keepalive]}].
 %%--------------------------------------------------------------------
 
 t_keepalive(_) ->
-    KA = emqttd_keepalive:start(fun() -> {ok, 1} end, 1, {keepalive, timeout}),
+    {ok, KA} = emqttd_keepalive:start(fun() -> {ok, 1} end, 1, {keepalive, timeout}),
     [resumed, timeout] = lists:reverse(keepalive_recv(KA, [])).
 
 keepalive_recv(KA, Acc) ->

+ 3 - 1
test/emqttd_protocol_SUITE.erl

@@ -22,6 +22,8 @@
 
 -include("emqttd.hrl").
 
+-include_lib("eunit/include/eunit.hrl").
+
 -include("emqttd_protocol.hrl").
 
 all() ->
@@ -344,7 +346,7 @@ message_make(_) ->
 message_from_packet(_) ->
     Msg = emqttd_message:from_packet(?PUBLISH_PACKET(1, <<"topic">>, 10, <<"payload">>)),
     ?assertEqual(1, Msg#mqtt_message.qos),
-    ?assertEqual(10, Msg#mqtt_message.packet_id),
+    ?assertEqual(10, Msg#mqtt_message.pktid),
     ?assertEqual(<<"topic">>, Msg#mqtt_message.topic),
     WillMsg = emqttd_message:from_packet(#mqtt_packet_connect{will_flag  = true,
                                                               will_topic = <<"WillTopic">>,