Przeglądaj źródła

Merge pull request #314 from emqtt/dev-feng

0.11.0
Feng Lee 10 lat temu
rodzic
commit
d407cff2c4

+ 12 - 0
.gitmodules

@@ -4,3 +4,15 @@
 [submodule "plugins/emqttd_plugin_template"]
 	path = plugins/emqttd_plugin_template
 	url = https://github.com/emqtt/emqttd_plugin_template.git
+[submodule "plugins/emqttd_plugin_pgsql"]
+	path = plugins/emqttd_plugin_pgsql
+	url = https://github.com/emqtt/emqttd_plugin_pgsql.git
+[submodule "plugins/emqttd_plugin_mysql"]
+	path = plugins/emqttd_plugin_mysql
+	url = https://github.com/emqtt/emqttd_plugin_mysql.git
+[submodule "plugins/emqttd_sockjs"]
+	path = plugins/emqttd_sockjs
+	url = https://github.com/emqtt/emqttd_sockjs.git
+[submodule "plugins/emqttd_stomp"]
+	path = plugins/emqttd_stomp
+	url = git@github.com:emqtt/emqttd_stomp.git

+ 26 - 0
CHANGELOG.md

@@ -2,6 +2,32 @@
 emqttd ChangeLog
 ==================
 
+0.11.0-beta (2015-09-25)
+-------------------------
+
+Highlight: Rebar to manage plugin dependencies.
+
+Highlight: [Stomp](https://github.com/emqtt/emqttd_stomp) and [SockJS](https://github.com/emqtt/emqttd_sockjs) Plugins!
+
+Improve: add rel/files/emqttd.config.development|production.
+
+Improve: rel/reltool.config.script to release deps of plugin.
+
+Improve: persist mnesia schema on slave nodes.
+
+Improve: use timer:seconds/1 api.
+
+Improve: The binary release will be compiled with R18.1 now.
+
+Bugfix: issue#306 - emqttd_cm should unregister the duplicated client
+
+Bugfix: issue#310 - usage of emqttd_ctl error: 'session list' should be 'sessions list'
+
+Bugfix: issue#311 - './bin/emqttd_ctl sessions list' error
+
+Bugfix: issue#312 - unsubcribe will lead to crash if emqttd_plugin_template plugin loaded
+
+
 0.10.4-beta (2015-09-18)
 -------------------------
 

+ 0 - 6
PLUGIN.md

@@ -1,8 +1,2 @@
 
-git submodule init 
-
-or
-
-git submodule update --remote
-
 Please see [Plugin Design](https://github.com/emqtt/emqttd/wiki/Plugin%20Design).

+ 2 - 0
README.md

@@ -56,6 +56,8 @@ emqttd is aimed to provide a solid, enterprise grade, extensible open-source MQT
 * [emqttd_plugin_pgsql](https://github.com/emqtt/emqttd_plugin_pgsql) - Authentication with PostgreSQL
 * [emqttd_plugin_kafka](https://github.com/emqtt/emqttd_plugin_kafka) - Publish MQTT Messages to Kafka
 * [emqttd_plugin_redis](https://github.com/emqtt/emqttd_plugin_redis) - Redis Plugin
+* [emqttd_stomp](https://github.com/emqtt/emqttd_stomp) - Stomp Protocol Plugin
+* [emqttd_sockjs](https://github.com/emqtt/emqttd_sockjs) - SockJS(Stomp) Plugin
 
 
 ## Dashboard

+ 1 - 0
plugins/emqttd_plugin_mysql

@@ -0,0 +1 @@
+Subproject commit c2f32b5f29d0b3ad7446aef1dda1c67923d27a49

+ 1 - 0
plugins/emqttd_plugin_pgsql

@@ -0,0 +1 @@
+Subproject commit f940996a2bb28cfcd86e97803b2fc34200605239

+ 1 - 0
plugins/emqttd_sockjs

@@ -0,0 +1 @@
+Subproject commit 1c643329c5e2aa1c2d01e79bb16065f11447adcd

+ 1 - 0
plugins/emqttd_stomp

@@ -0,0 +1 @@
+Subproject commit 658667cd5965c0f3927ab0a8e382f2d95c8f2182

+ 244 - 0
rel/files/emqttd.config.development

@@ -0,0 +1,244 @@
+% -*- mode: erlang;erlang-indent-level: 4;indent-tabs-mode: nil -*-
+%% ex: ft=erlang ts=4 sw=4 et
+[{kernel, 
+	[{start_timer, true},
+	 {start_pg2, true}
+ ]},
+ {sasl, [
+	{sasl_error_logger, {file, "log/emqttd_sasl.log"}}
+ ]},
+ {ssl, [
+    %{versions, ['tlsv1.2', 'tlsv1.1']}
+ ]},
+ {lager, [
+    {colored, true},
+    {async_threshold, 1000},
+	{error_logger_redirect, false},
+	{crash_log, "log/emqttd_crash.log"},
+	{handlers, [
+		{lager_console_backend, info},
+		{lager_file_backend, [
+            {formatter_config, [time, " ", pid, " [",severity,"] ", message, "\n"]},
+			{file, "log/emqttd_info.log"},
+			{level, info},
+			{size, 104857600},
+			{date, "$D0"},
+			{count, 30}
+		]},
+		{lager_file_backend, [
+            {formatter_config, [time, " ", pid, " [",severity,"] ", message, "\n"]},
+			{file, "log/emqttd_error.log"},
+			{level, error},
+			{size, 104857600},
+			{date, "$D0"},
+			{count, 30}
+		]}
+	]}
+ ]},
+ {esockd, [
+    {logger, {lager, info}}
+ ]},
+ {emqttd, [
+    %% Authentication and Authorization
+    {access, [
+        %% Authetication. Anonymous Default
+        {auth, [
+            %% Authentication with username, password
+            %{username, []},
+            
+            %% Authentication with clientid
+            %{clientid, [{password, no}, {file, "etc/clients.config"}]},
+
+            %% Authentication with LDAP
+            % {ldap, [
+            %    {servers, ["localhost"]},
+            %    {port, 389},
+            %    {timeout, 30},
+            %    {user_dn, "uid=$u,ou=People,dc=example,dc=com"},
+            %    {ssl, fasle},
+            %    {sslopts, [
+            %        {"certfile", "ssl.crt"},
+            %        {"keyfile", "ssl.key"}]}
+            % ]},
+
+            %% Allow all
+            {anonymous, []}
+        ]},
+        %% ACL config
+        {acl, [
+            %% Internal ACL module
+            {internal,  [{file, "etc/acl.config"}, {nomatch, allow}]}
+        ]}
+    ]},
+    %% MQTT Protocol Options
+    {mqtt, [
+        %% Packet
+        {packet, [
+            %% Max ClientId Length Allowed
+            {max_clientid_len, 1024},
+            %% Max Packet Size Allowed, 64K default
+            {max_packet_size,  65536}
+        ]},
+        %% Client
+        {client, [
+            %TODO: Network ingoing limit
+            %{ingoing_rate_limit, '64KB/s'}
+            %TODO: Reconnet control
+        ]},
+        %% Session
+        {session, [
+            %% Max number of QoS 1 and 2 messages that can be “in flight” at one time.
+            %% 0 means no limit
+            {max_inflight, 100},
+
+            %% Max retries for unack Qos1/2 messages
+            {unack_retries, 3},
+
+            %% Retry after 4, 8, 16 seconds
+            {unack_timeout, 4},
+
+            %% Awaiting PUBREL Timeout
+            {await_rel_timeout, 8},
+
+            %% Max Packets that Awaiting PUBREL, 0 means no limit
+            {max_awaiting_rel, 0},
+
+            %% Statistics Collection Interval(seconds)
+            {collect_interval, 10},
+
+            %% Expired after 2 days
+            {expired_after, 48}
+
+        ]},
+        %% Session
+        {queue, [
+            %% Max queue length. enqueued messages when persistent client disconnected, 
+            %% or inflight window is full.
+            {max_length, 100},
+
+            %% Low-water mark of queued messsages
+            {low_watermark, 0.2},
+
+            %% High-water mark of queued messsages
+            {high_watermark, 0.6},
+
+            %% Queue Qos0 messages?
+            {queue_qos0, true}
+        ]}
+    ]},
+    %% Broker Options
+    {broker, [
+        %% System interval of publishing broker $SYS messages
+        {sys_interval, 60},
+
+        %% Retained messages
+        {retained, [
+            %% Max number of retained messages
+            {max_message_num, 100000},
+            %% Max Payload Size of retained message
+            {max_playload_size, 65536}
+        ]},
+        %% PubSub
+        {pubsub, [
+            %% default should be scheduler numbers
+            %% {pool_size, 8}
+        ]},
+        %% Bridge
+        {bridge, [
+            %%TODO: bridge queue size
+            {max_queue_len, 10000},
+
+            %% Ping Interval of bridge node
+            {ping_down_interval, 1} %seconds
+        ]}
+    ]},
+    %% Modules
+    {modules, [
+        %% Client presence management module.
+        %% Publish messages when client connected or disconnected
+        {presence, [{qos, 0}]}
+
+        %% Subscribe topics automatically when client connected
+        %% {autosub, [{"$Q/client/$c", 0}]}
+
+        %% Rewrite rules
+        %% {rewrite, [{file, "etc/rewrite.config"}]}
+        
+    ]},
+    %% Plugins
+    {plugins, [
+        %% Plugin App Library Dir
+        {plugins_dir, "./plugins"},
+
+        %% File to store loaded plugin names.
+        {loaded_file, "./data/loaded_plugins"}
+    ]},
+    %% Listeners
+    {listeners, [
+        {mqtt, 1883, [
+            %% Size of acceptor pool
+            {acceptors, 16},
+            %% Maximum number of concurrent clients
+            {max_clients, 512},
+            %% Socket Access Control
+            {access, [{allow, all}]},
+            %% Socket Options
+            {sockopts, [
+                {backlog, 512}
+                %Set buffer if hight thoughtput
+                %{recbuf, 4096},
+                %{sndbuf, 4096}
+                %{buffer, 4096},
+            ]}
+        ]},
+        {mqtts, 8883, [
+            %% Size of acceptor pool
+            {acceptors, 4},
+            %% Maximum number of concurrent clients
+            {max_clients, 512},
+            %% Socket Access Control
+            {access, [{allow, all}]},
+            %% SSL certificate and key files
+            {ssl, [{certfile, "etc/ssl/ssl.crt"},
+                   {keyfile,  "etc/ssl/ssl.key"}]},
+            %% Socket Options
+            {sockopts, [
+                {backlog, 1024}
+                %{buffer, 4096},
+            ]}
+        ]},
+        %% WebSocket over HTTPS Listener
+        %% {https, 8083, [
+        %%  %% Size of acceptor pool
+        %%  {acceptors, 4},
+        %%  %% Maximum number of concurrent clients
+        %%  {max_clients, 512},
+        %%  %% Socket Access Control
+        %%  {access, [{allow, all}]},
+        %%  %% SSL certificate and key files
+        %%  {ssl, [{certfile, "etc/ssl/ssl.crt"},
+        %%         {keyfile,  "etc/ssl/ssl.key"}]},
+        %%  %% Socket Options
+        %%  {sockopts, [
+        %%      %{buffer, 4096},
+        %%      {backlog, 1024}
+        %%  ]}
+        %%]},
+        %% HTTP and WebSocket Listener
+        {http, 8083, [
+            %% Size of acceptor pool
+            {acceptors, 4},
+            %% Maximum number of concurrent clients
+            {max_clients, 64},
+            %% Socket Access Control
+            {access, [{allow, all}]},
+            %% Socket Options
+            {sockopts, [
+                {backlog, 1024}
+                %{buffer, 4096},
+            ]}
+        ]}
+    ]}
+ ]}
+].
+

+ 236 - 0
rel/files/emqttd.config.production

@@ -0,0 +1,236 @@
+% -*- mode: erlang;erlang-indent-level: 4;indent-tabs-mode: nil -*-
+%% ex: ft=erlang ts=4 sw=4 et
+[{kernel, 
+	[{start_timer, true},
+	 {start_pg2, true}
+ ]},
+ {sasl, [
+	{sasl_error_logger, {file, "log/emqttd_sasl.log"}}
+ ]},
+ {ssl, [
+    %{versions, ['tlsv1.2', 'tlsv1.1']}
+ ]},
+ {lager, [
+    {colored, true},
+    {async_threshold, 5000},
+	{error_logger_redirect, false},
+	{crash_log, "log/emqttd_crash.log"},
+	{handlers, [
+		{lager_console_backend, info},
+		{lager_file_backend, [
+            {formatter_config, [time, " ", pid, " [",severity,"] ", message, "\n"]},
+			{file, "log/emqttd_error.log"},
+			{level, error},
+			{size, 104857600},
+			{date, "$D0"},
+			{count, 30}
+		]}
+	]}
+ ]},
+ {esockd, [
+    {logger, {lager, error}}
+ ]},
+ {emqttd, [
+    %% Authentication and Authorization
+    {access, [
+        %% Authetication. Anonymous Default
+        {auth, [
+            %% Authentication with username, password
+            %{username, []},
+            
+            %% Authentication with clientid
+            %{clientid, [{password, no}, {file, "etc/clients.config"}]},
+
+            %% Authentication with LDAP
+            % {ldap, [
+            %    {servers, ["localhost"]},
+            %    {port, 389},
+            %    {timeout, 30},
+            %    {user_dn, "uid=$u,ou=People,dc=example,dc=com"},
+            %    {ssl, fasle},
+            %    {sslopts, [
+            %        {"certfile", "ssl.crt"},
+            %        {"keyfile", "ssl.key"}]}
+            % ]},
+
+            %% Allow all
+            {anonymous, []}
+        ]},
+        %% ACL config
+        {acl, [
+            %% Internal ACL module
+            {internal,  [{file, "etc/acl.config"}, {nomatch, allow}]}
+        ]}
+    ]},
+    %% MQTT Protocol Options
+    {mqtt, [
+        %% Packet
+        {packet, [
+            %% Max ClientId Length Allowed
+            {max_clientid_len, 1024},
+            %% Max Packet Size Allowed, 64K default
+            {max_packet_size,  65536}
+        ]},
+        %% Client
+        {client, [
+            %TODO: Network ingoing limit
+            %{ingoing_rate_limit, '64KB/s'}
+            %TODO: Reconnet control
+        ]},
+        %% Session
+        {session, [
+            %% Max number of QoS 1 and 2 messages that can be “in flight” at one time.
+            %% 0 means no limit
+            {max_inflight, 100},
+
+            %% Max retries for unack Qos1/2 messages
+            {unack_retries, 3},
+
+            %% Retry after 8, 16, 32 seconds
+            {unack_timeout, 8},
+
+            %% Awaiting PUBREL Timeout
+            {await_rel_timeout, 8},
+
+            %% Max Packets that Awaiting PUBREL, 0 means no limit
+            {max_awaiting_rel, 0},
+
+            %% Statistics Collection Interval(seconds)
+            {collect_interval, 20},
+
+            %% Expired after 2 days
+            {expired_after, 48}
+
+        ]},
+        %% Session
+        {queue, [
+            %% Max queue length. enqueued messages when persistent client disconnected, 
+            %% or inflight window is full.
+            {max_length, 100},
+
+            %% Low-water mark of queued messsages
+            {low_watermark, 0.2},
+
+            %% High-water mark of queued messsages
+            {high_watermark, 0.6},
+
+            %% Queue Qos0 messages?
+            {queue_qos0, true}
+        ]}
+    ]},
+    %% Broker Options
+    {broker, [
+        %% System interval of publishing broker $SYS messages
+        {sys_interval, 60},
+
+        %% Retained messages
+        {retained, [
+            %% Max number of retained messages
+            {max_message_num, 100000},
+            %% Max Payload Size of retained message
+            {max_playload_size, 65536}
+        ]},
+        %% PubSub
+        {pubsub, [
+            %% default should be scheduler numbers
+            {pool_size, 8}
+        ]},
+        %% Bridge
+        {bridge, [
+            %%TODO: bridge queue size
+            {max_queue_len, 10000},
+
+            %% Ping Interval of bridge node
+            {ping_down_interval, 1} %seconds
+        ]}
+    ]},
+    %% Modules
+    {modules, [
+        %% Client presence management module.
+        %% Publish messages when client connected or disconnected
+        {presence, [{qos, 0}]}
+
+        %% Subscribe topics automatically when client connected
+        %% {autosub, [{"$Q/client/$c", 0}]}
+
+        %% Rewrite rules
+        %% {rewrite, [{file, "etc/rewrite.config"}]}
+        
+    ]},
+    %% Plugins
+    {plugins, [
+        %% Plugin App Library Dir
+        {plugins_dir, "./plugins"},
+
+        %% File to store loaded plugin names.
+        {loaded_file, "./data/loaded_plugins"}
+    ]},
+    %% Listeners
+    {listeners, [
+        {mqtt, 1883, [
+            %% Size of acceptor pool
+            {acceptors, 16},
+            %% Maximum number of concurrent clients
+            {max_clients, 8192},
+            %% Socket Access Control
+            {access, [{allow, all}]},
+            %% Socket Options
+            {sockopts, [
+                {backlog, 512}
+                %Set buffer if hight thoughtput
+                %{recbuf, 4096},
+                %{sndbuf, 4096}
+                %{buffer, 4096},
+            ]}
+        ]},
+        {mqtts, 8883, [
+            %% Size of acceptor pool
+            {acceptors, 4},
+            %% Maximum number of concurrent clients
+            {max_clients, 512},
+            %% Socket Access Control
+            {access, [{allow, all}]},
+            %% SSL certificate and key files
+            {ssl, [{certfile, "etc/ssl/ssl.crt"},
+                   {keyfile,  "etc/ssl/ssl.key"}]},
+            %% Socket Options
+            {sockopts, [
+                {backlog, 1024}
+                %{buffer, 4096},
+            ]}
+        ]},
+        %% WebSocket over HTTPS Listener
+        %% {https, 8083, [
+        %%  %% Size of acceptor pool
+        %%  {acceptors, 4},
+        %%  %% Maximum number of concurrent clients
+        %%  {max_clients, 512},
+        %%  %% Socket Access Control
+        %%  {access, [{allow, all}]},
+        %%  %% SSL certificate and key files
+        %%  {ssl, [{certfile, "etc/ssl/ssl.crt"},
+        %%         {keyfile,  "etc/ssl/ssl.key"}]},
+        %%  %% Socket Options
+        %%  {sockopts, [
+        %%      %{buffer, 4096},
+        %%      {backlog, 1024}
+        %%  ]}
+        %%]},
+        %% HTTP and WebSocket Listener
+        {http, 8083, [
+            %% Size of acceptor pool
+            {acceptors, 4},
+            %% Maximum number of concurrent clients
+            {max_clients, 64},
+            %% Socket Access Control
+            {access, [{allow, all}]},
+            %% Socket Options
+            {sockopts, [
+                {backlog, 1024}
+                %{buffer, 4096},
+            ]}
+        ]}
+    ]}
+ ]}
+].
+

+ 2 - 2
rel/files/emqttd_ctl

@@ -351,8 +351,8 @@ case "$1" in
         echo "  clients list                  #list all clients"
         echo "  clients show <ClientId>       #show a client"
         echo "  clients kick <ClientId>       #kick a client"
-        echo "  session list                  #list all sessions"
-        echo "  session show <ClientId>       #show a sessions"
+        echo "  sessions list                 #list all sessions"
+        echo "  sessions show <ClientId>      #show a sessions"
         echo "  ----------------------------------------------------------------"
         echo "  plugins list                  #query loaded plugins"
         echo "  plugins load <Plugin>         #load plugin"

+ 2 - 2
rel/reltool.config

@@ -13,7 +13,6 @@
 		 syntax_tools,
 		 ssl,
 		 crypto,
-         %mnesia,
          eldap,
          xmerl,
 		 os_mon,
@@ -84,7 +83,8 @@
            {copy, "files/install_upgrade.escript", "bin/install_upgrade.escript"},
            {copy, "files/ssl/ssl.crt", "etc/ssl/ssl.crt"},
            {copy, "files/ssl/ssl.key", "etc/ssl/ssl.key"},
-           {template, "files/emqttd.config", "etc/emqttd.config"},
+           {template, "files/emqttd.config.development", "etc/emqttd.config"},
+           {template, "files/emqttd.config.production", "etc/emqttd.config.production"},
            {template, "files/acl.config", "etc/acl.config"},
            {template, "files/rewrite.config", "etc/rewrite.config"},
            {template, "files/clients.config", "etc/clients.config"},

+ 18 - 0
rel/reltool.config.script

@@ -0,0 +1,18 @@
+%% -*- mode: erlang;erlang-indent-level: 4;indent-tabs-mode: nil -*-
+%% ex: ft=erlang ts=4 sw=4 et
+
+Sys         = proplists:get_value(sys, CONFIG),
+IncludeApps = [App || {app, App, _} <- Sys],
+
+[DepsDir]   = proplists:get_value(lib_dirs, Sys),
+DepApps     = lists:map(fun(AppFile) ->
+                        {ok, [{application, Name, Attrs}]}
+                            = file:consult(filename:join(DepsDir, AppFile)),
+                        Name
+                      end, filelib:wildcard("*/ebin/*.app", DepsDir)),
+AppendApps  = DepApps -- IncludeApps,
+Cond        = [{mod_cond, app}, {incl_cond, include}],
+
+NewSys      = lists:append(Sys, [{app, App, Cond} || App <- AppendApps]),
+
+lists:keyreplace(sys, 1, CONFIG, {sys, NewSys}).

+ 1 - 1
src/emqttd.app.src

@@ -1,7 +1,7 @@
 {application, emqttd,
  [
   {description, "Erlang MQTT Broker"},
-  {vsn, "0.10.4"},
+  {vsn, "0.11.0"},
   {modules, []},
   {registered, []},
   {applications, [kernel,

+ 9 - 8
src/emqttd_ctl.erl

@@ -162,15 +162,15 @@ sessions(["list"]) ->
     dump(session, mqtt_transient_session),
     dump(session, mqtt_persistent_session);
 
-sessions(["show", ClientId0]) ->
-    ClientId = list_to_binary(ClientId0),
-    case {ets:lookup(mqtt_transient_session, ClientId),
-          ets:lookup(mqtt_persistent_session, ClientId)} of
+sessions(["show", ClientId]) ->
+    MP = {{list_to_binary(ClientId), '_'}, '_'},
+    case {ets:match_object(mqtt_transient_session, MP),
+          ets:match_object(mqtt_persistent_session, MP)} of
         {[], []} ->
             ?PRINT_MSG("Not Found.~n");
-        {[SessInfo], _} -> 
+        {[SessInfo], _} ->
             print(session, SessInfo);
-        {_, [SessInfo]} -> 
+        {_, [SessInfo]} ->
             print(session, SessInfo)
     end.
     
@@ -318,7 +318,7 @@ print(client, #mqtt_client{client_id = ClientId, clean_sess = CleanSess,
              emqttd_net:format(Peername),
              emqttd_util:now_to_secs(ConnectedAt)]);
 
-print(session, {ClientId, SessInfo}) ->
+print(session, {{ClientId, _ClientPid}, SessInfo}) ->
     InfoKeys = [clean_sess, 
                 max_inflight,
                 inflight_queue,
@@ -330,7 +330,8 @@ print(session, {ClientId, SessInfo}) ->
                 created_at,
                 subscriptions],
     ?PRINT("Session(~s, clean_sess=~s, max_inflight=~w, inflight_queue=~w, "
-           "message_queue=~w, message_dropped=~w, awaiting_rel=~w, awaiting_ack=~w, awaiting_comp=~w, "
+           "message_queue=~w, message_dropped=~w, "
+           "awaiting_rel=~w, awaiting_ack=~w, awaiting_comp=~w, "
            "created_at=~w, subscriptions=~s)~n",
             [ClientId | [format(Key, proplists:get_value(Key, SessInfo)) || Key <- InfoKeys]]);
 

+ 2 - 2
src/emqttd_keepalive.erl

@@ -42,7 +42,7 @@
 %%------------------------------------------------------------------------------
 new({Transport, Socket}, TimeoutSec, TimeoutMsg) when TimeoutSec > 0 ->
     {ok, [{recv_oct, RecvOct}]} = Transport:getstat(Socket, [recv_oct]),
-	Ref = erlang:send_after(TimeoutSec*1000, self(), TimeoutMsg),
+	Ref = erlang:send_after(timer:seconds(TimeoutSec), self(), TimeoutMsg),
 	#keepalive {transport   = Transport,
                 socket      = Socket, 
                 recv_oct    = RecvOct, 
@@ -67,7 +67,7 @@ resume(KeepAlive = #keepalive {transport   = Transport,
         true ->
             %need?
             cancel(Ref),
-            NewRef = erlang:send_after(TimeoutSec*1000, self(), TimeoutMsg),
+            NewRef = erlang:send_after(timer:seconds(TimeoutSec), self(), TimeoutMsg),
             {resumed, KeepAlive#keepalive{recv_oct = NewRecvOct, timer_ref = NewRef}}
     end.
 

+ 3 - 3
src/emqttd_mnesia.erl

@@ -146,10 +146,10 @@ cluster(Node) ->
             throw({error, failed_to_connect_extra_db_nodes});
         {ok, Nodes} ->
             case lists:member(Node, Nodes) of
-                true -> lager:info("mnesia connected to extra_db_node '~s' successfully!", [Node]);
+                true ->  lager:info("mnesia connected to extra_db_node '~s' successfully!", [Node]);
                 false -> lager:error("mnesia failed to connect extra_db_node '~s'!", [Node])
-            end
-
+            end,
+            mnesia:change_table_copy_type(schema, node(), disc_copies)
     end,
     copy_tables(),
     wait_for_tables().

+ 4 - 3
src/emqttd_protocol.erl

@@ -311,13 +311,14 @@ trace(send, Packet, #proto_state{peername  = Peername, client_id = ClientId}) ->
 redeliver({?PUBREL, PacketId}, State) ->
     send(?PUBREL_PACKET(PacketId), State).
 
-shutdown(duplicate_id, _State) ->
-    quiet; %%
-
 shutdown(Error, #proto_state{client_id = undefined}) ->
     lager:info("Protocol shutdown ~p", [Error]),
     ignore;
 
+shutdown(duplicate_id, #proto_state{client_id = ClientId}) ->
+    %% unregister the device
+    emqttd_cm:unregister(ClientId);
+
 %% TODO: ClientId??
 shutdown(Error, #proto_state{peername = Peername, client_id = ClientId, will_msg = WillMsg}) ->
 	lager:info([{client, ClientId}], "Client ~s@~s: shutdown ~p",

+ 3 - 3
src/emqttd_session.erl

@@ -664,8 +664,8 @@ next_packet_id(Session = #session{packet_id = 16#ffff}) ->
 next_packet_id(Session = #session{packet_id = Id}) ->
     Session#session{packet_id = Id + 1}.
 
-timer(Timeout, TimeoutMsg) ->
-    erlang:send_after(Timeout * 1000, self(), TimeoutMsg).
+timer(TimeoutSec, TimeoutMsg) ->
+    erlang:send_after(timer:seconds(TimeoutSec), self(), TimeoutMsg).
 
 cancel_timer(undefined) -> 
 	undefined;
@@ -679,7 +679,7 @@ start_collector(Session = #session{collect_interval = 0}) ->
     Session;
 
 start_collector(Session = #session{collect_interval = Interval}) ->
-    TRef = erlang:send_after(Interval * 1000, self(), collect_info),
+    TRef = erlang:send_after(timer:seconds(Interval), self(), collect_info),
     Session#session{collect_timer = TRef}.
 
 info(#session{clean_sess      = CleanSess,