Feng 9 лет назад
Родитель
Сommit
61b2ab48ad

+ 10 - 2
README.md

@@ -7,10 +7,14 @@ emqttd is fully open source and licensed under the Apache Version 2.0. emqttd im
 
 emqttd requires Erlang R18+ to build since 1.1 release.
 
-Demo Server: tcp://t.emqtt.io:1883
-
 Follow us on Twitter: [@emqtt](https://twitter.com/emqtt)
 
+## Cluster
+
+The **q.emqtt.com** hosts a public emqttd cluster on [QingCloud](https://qingcloud.com):
+
+![qing_cluster](http://emqtt.io/static/img/public_cluster.png)
+
 ## Goals
 
 The emqttd project is aimed to implement a scalable, distributed, extensible open-source MQTT broker for IoT, M2M and Mobile applications that hope to handle millions of concurrent MQTT clients.
@@ -156,6 +160,10 @@ Benchmark Report: [benchmark for 0.12.0 release](https://github.com/emqtt/emqttd
 * [@phanimahesh](https://github.com/phanimahesh)
 * [@dvliman](https://github.com/dvliman)
 
+## Partners
+
+[QingCloud](https://qingcloud.com) is the world’s first IaaS provider that can deliver any number of IT resources in seconds and adopts a second-based billing system. QingCloud is committed to providing a reliable, secure, on-demand and real-time IT resource platform with excellent performance, which includes all components of a complete IT infrastructure system: computing, storage, networking and security.
+
 ## Author
 
 Feng Lee <feng@emqtt.io>

+ 22 - 0
docs/source/changes.rst

@@ -5,6 +5,28 @@
 Changes
 =======
 
+.. _release_1.1.2:
+
+-------------
+Version 1.1.2
+-------------
+
+*Release Date: 2016-06-30*
+
+Upgrade mysql-otp driver to 1.2.0 (#564, #523, #586, #596)
+
+Fix WebSocket Client Leak (PR #612)
+
+java.io.EOFException using paho java client (#551)
+
+Send message from paho java client to javascript client (#552)
+
+Compatible with the Qos0 PUBREL packet (#575)
+
+Empty clientId with non-clean session accepted (#599)
+
+Update docs to fix typos (#601, #607)
+
 .. _release_1.1.1:
 
 -------------

+ 1 - 1
docs/source/guide.rst

@@ -320,7 +320,7 @@ Configure 'aclquery' and 'acl_nomatch' in emqttd_plugin_mysql/etc/plugin.config:
         ...
 
         %% comment this query, the acl will be disabled
-        {aclquery, "select * from mqtt_acl where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'"},
+        {aclquery, "select allow, ipaddr, username, clientid, access, topic from mqtt_acl where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'"},
 
         %% If no rules matched, return...
         {acl_nomatch, allow}

+ 1 - 0
docs/source/index.rst

@@ -53,6 +53,7 @@ Contents:
    plugins
    tune
    changes
+   upgrade
 
 -------
 License

+ 8 - 2
docs/source/tune.rst

@@ -72,8 +72,14 @@ Connection Tracking::
 The TIME-WAIT Buckets Pool, Recycling and Reuse::
 
     net.ipv4.tcp_max_tw_buckets=1048576
-    net.ipv4.tcp_tw_recycle = 1
-    net.ipv4.tcp_tw_reuse = 1
+
+    # Enable fast recycling of TIME_WAIT sockets.  Enabling this
+    # option is not recommended for devices communicating with the
+    # general Internet or using NAT (Network Address Translation).
+    # Since some NAT gateways pass through IP timestamp values, one
+    # IP can appear to have non-increasing timestamps.
+    # net.ipv4.tcp_tw_recycle = 1
+    # net.ipv4.tcp_tw_reuse = 1
 
 Timeout for FIN-WAIT-2 sockets::
 

+ 33 - 0
docs/source/upgrade.rst

@@ -0,0 +1,33 @@
+
+.. _upgrade:
+
+=======
+Upgrade
+=======
+
+.. _upgrade_1.1.2:
+
+----------------
+Upgrade to 1.1.2
+----------------
+
+.. NOTE:: 1.0+ releases can be upgraded to 1.1.2 smoothly
+
+Steps:
+
+1. Download and install emqttd-1.1.2 to the new directory, for example::
+
+    Old installation: /opt/emqttd_1_0_0/
+
+    New installation: /opt/emqttd_1_1_2/
+
+2. Copy the 'etc/' and 'data/' from the old installation::
+
+    cp -R /opt/emqttd_1_0_0/etc/* /opt/emqttd_1_1_2/etc/
+
+    cp -R /opt/emqttd_1_0_0/data/* /opt/emqttd_1_1_2/data/
+
+3. Copy the plugins/{plugin}/etc/* from the old installation if you loaded plugins.
+
+4. Stop the old emqttd, and start the new one.
+

BIN
docs/specs/MQTT-SN_spec_v1.2.pdf


BIN
docs/specs/mqtt-v3.1.1.pdf


+ 2 - 2
rel/files/emqttd.config.development

@@ -59,8 +59,8 @@
             %    {user_dn, "uid=$u,ou=People,dc=example,dc=com"},
             %    {ssl, fasle},
             %    {sslopts, [
-            %        {"certfile", "ssl.crt"},
-            %        {"keyfile", "ssl.key"}]}
+            %        {certfile, "ssl.crt"},
+            %        {keyfile, "ssl.key"}]}
             % ]},
 
             %% Allow all

+ 2 - 2
rel/files/emqttd.config.production

@@ -51,8 +51,8 @@
             %    {user_dn, "uid=$u,ou=People,dc=example,dc=com"},
             %    {ssl, fasle},
             %    {sslopts, [
-            %        {"certfile", "ssl.crt"},
-            %        {"keyfile", "ssl.key"}]}
+            %        {certfile, "ssl.crt"},
+            %        {keyfile, "ssl.key"}]}
             % ]},
 
             %% Allow all

+ 1 - 0
rel/files/vm.args

@@ -58,3 +58,4 @@
 ## Tweak GC to run more often
 -env ERL_FULLSWEEP_AFTER 1000
 
+-env ERL_CRASH_DUMP log/emqttd_crash.dump

+ 2 - 1
src/emqttd_app.erl

@@ -89,6 +89,7 @@ start_servers(Sup) ->
                {"emqttd client manager", {supervisor, emqttd_cm_sup}},
                {"emqttd session manager", {supervisor, emqttd_sm_sup}},
                {"emqttd session supervisor", {supervisor, emqttd_session_sup}},
+               {"emqttd wsclient supervisor", {supervisor, emqttd_ws_client_sup}},
                {"emqttd broker", emqttd_broker},
                {"emqttd alarm", emqttd_alarm},
                {"emqttd mod supervisor", emqttd_mod_sup},
@@ -187,7 +188,7 @@ start_listener({https, ListenOn, Opts}) ->
 
 start_listener(Protocol, ListenOn, Opts) ->
     MFArgs = {emqttd_client, start_link, [emqttd:env(mqtt)]},
-    esockd:open(Protocol, ListenOn, merge_sockopts(Opts), MFArgs).
+    {ok, _} = esockd:open(Protocol, ListenOn, merge_sockopts(Opts), MFArgs).
 
 merge_sockopts(Options) ->
     SockOpts = emqttd_opts:merge(?MQTT_SOCKOPTS,

+ 7 - 1
src/emqttd_auth_username.erl

@@ -40,6 +40,11 @@
 %%--------------------------------------------------------------------
 %% CLI
 %%--------------------------------------------------------------------
+cli(["list"]) ->
+    if_enabled(fun() ->
+        Usernames = mnesia:dirty_all_keys(?AUTH_USERNAME_TAB),
+        [?PRINT("~s~n", [Username]) || Username <- Usernames]
+    end);
 
 cli(["add", Username, Password]) ->
     if_enabled(fun() ->
@@ -52,7 +57,8 @@ cli(["del", Username]) ->
     end);
 
 cli(_) ->
-    ?USAGE([{"users add <Username> <Password>", "Add User"},
+    ?USAGE([{"users list", "List users"},
+            {"users add <Username> <Password>", "Add User"},
             {"users del <Username>", "Delete User"}]).
 
 if_enabled(Fun) ->

+ 14 - 1
src/emqttd_client.erl

@@ -26,7 +26,8 @@
 -include("emqttd_internal.hrl").
 
 %% API Function Exports
--export([start_link/2, session/1, info/1, kick/1]).
+-export([start_link/2, session/1, info/1, kick/1,
+         set_rate_limit/2, get_rate_limit/1]).
 
 %% SUB/UNSUB Asynchronously. Called by plugins.
 -export([subscribe/2, unsubscribe/2]).
@@ -59,6 +60,12 @@ info(CPid) ->
 kick(CPid) ->
     gen_server:call(CPid, kick).
 
+set_rate_limit(Cpid, Rl) ->
+    gen_server:call(Cpid, {set_rate_limit, Rl}).
+
+get_rate_limit(Cpid) ->
+    gen_server:call(Cpid, get_rate_limit).
+
 subscribe(CPid, TopicTable) ->
     gen_server:cast(CPid, {subscribe, TopicTable}).
 
@@ -120,6 +127,12 @@ handle_call(info, _From, State = #client_state{connection  = Connection,
 handle_call(kick, _From, State) ->
     {stop, {shutdown, kick}, ok, State};
 
+handle_call({set_rate_limit, Rl}, _From, State) ->
+    {reply, ok, State#client_state{rate_limit = Rl}};
+
+handle_call(get_rate_limit, _From, State = #client_state{rate_limit = Rl}) ->
+    {reply, Rl, State};
+
 handle_call(Req, _From, State) ->
     ?UNEXPECTED_REQ(Req, State).
 

+ 3 - 3
src/emqttd_http.erl

@@ -52,13 +52,14 @@ handle_request('POST', "/mqtt/publish", Req) ->
 %%--------------------------------------------------------------------
 %% MQTT Over WebSocket
 %%--------------------------------------------------------------------
+
 handle_request('GET', "/mqtt", Req) ->
     lager:info("WebSocket Connection from: ~s", [Req:get(peer)]),
     Upgrade = Req:get_header_value("Upgrade"),
     Proto   = Req:get_header_value("Sec-WebSocket-Protocol"),
     case {is_websocket(Upgrade), Proto} of
         {true, "mqtt" ++ _Vsn} ->
-            emqttd_ws_client:start_link(Req);
+            emqttd_ws:handle_request(Req);
         {false, _} ->
             lager:error("Not WebSocket: Upgrade = ~s", [Upgrade]),
             Req:respond({400, [], <<"Bad Request">>});
@@ -144,13 +145,12 @@ authorized(Req) ->
 user_passwd(BasicAuth) ->
     list_to_tuple(binary:split(base64:decode(BasicAuth), <<":">>)). 
 
-
 int(S) -> list_to_integer(S).
 
 bool("0") -> false;
 bool("1") -> true.
 
-is_websocket(Upgrade) -> 
+is_websocket(Upgrade) ->
     Upgrade =/= undefined andalso string:to_lower(Upgrade) =:= "websocket".
 
 docroot() ->

+ 1 - 2
src/emqttd_parser.erl

@@ -36,8 +36,7 @@ new(Opts) ->
     fun(Bin) -> parse(Bin, {none, limit(Opts)}) end.
 
 limit(Opts) ->
-    #mqtt_packet_limit{max_packet_size = 
-                        proplists:get_value(max_packet_size, Opts, ?MAX_LEN)}.
+    #mqtt_packet_limit{max_packet_size = proplists:get_value(max_packet_size, Opts, ?MAX_LEN)}.
 
 %% @doc Parse MQTT Packet
 -spec(parse(binary(), {none, [option()]} | fun())

+ 2 - 2
src/emqttd_session_sup.erl

@@ -38,7 +38,7 @@ start_session(CleanSess, ClientId, ClientPid) ->
 %%--------------------------------------------------------------------
 
 init([]) ->
-    {ok, {{simple_one_for_one, 10, 10},
+    {ok, {{simple_one_for_one, 0, 1},
           [{session, {emqttd_session, start_link, []},
-              temporary, 10000, worker, [emqttd_session]}]}}.
+              temporary, 5000, worker, [emqttd_session]}]}}.
 

+ 75 - 0
src/emqttd_ws.erl

@@ -0,0 +1,75 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqttd_ws).
+
+-export([handle_request/1, ws_loop/3]).
+
+%% WebSocket Loop State
+-record(wsocket_state, {peer, client_pid, packet_opts, parser_fun}).
+
+-define(WSLOG(Level, Peer, Format, Args),
+        lager:Level("WsClient(~s): " ++ Format, [Peer | Args])).
+
+%%--------------------------------------------------------------------
+%% Handle WebSocket Request
+%%--------------------------------------------------------------------
+
+%% @doc Handle WebSocket Request.
+handle_request(Req) ->
+    Peer = Req:get(peer),
+    PktOpts = emqttd:env(mqtt, packet),
+    ParserFun = emqttd_parser:new(PktOpts),
+    {ReentryWs, ReplyChannel} = upgrade(Req),
+    {ok, ClientPid} = emqttd_ws_client_sup:start_client(self(), Req, ReplyChannel),
+    ReentryWs(#wsocket_state{peer = Peer, client_pid = ClientPid,
+                             packet_opts = PktOpts, parser_fun = ParserFun}).
+
+%% @doc Upgrade WebSocket.
+%% @private
+upgrade(Req) ->
+    mochiweb_websocket:upgrade_connection(Req, fun ?MODULE:ws_loop/3).
+
+%%--------------------------------------------------------------------
+%% Receive Loop
+%%--------------------------------------------------------------------
+
+%% @doc WebSocket frame receive loop.
+ws_loop(<<>>, State, _ReplyChannel) ->
+    State;
+ws_loop([<<>>], State, _ReplyChannel) ->
+    State;
+ws_loop(Data, State = #wsocket_state{peer = Peer, client_pid = ClientPid,
+                                     parser_fun = ParserFun}, ReplyChannel) ->
+    ?WSLOG(debug, Peer, "RECV ~p", [Data]),
+    case catch ParserFun(iolist_to_binary(Data)) of
+        {more, NewParser} ->
+            State#wsocket_state{parser_fun = NewParser};
+        {ok, Packet, Rest} ->
+            gen_server:cast(ClientPid, {received, Packet}),
+            ws_loop(Rest, reset_parser(State), ReplyChannel);
+        {error, Error} ->
+            ?WSLOG(error, Peer, "Frame error: ~p", [Error]),
+            exit({shutdown, Error});
+        {'EXIT', Reason} ->
+            ?WSLOG(error, Peer, "Frame error: ~p", [Reason]),
+            ?WSLOG(error, Peer, "Error data: ~p", [Data]),
+            exit({shutdown, parser_error})
+    end.
+
+reset_parser(State = #wsocket_state{packet_opts = PktOpts}) ->
+    State#wsocket_state{parser_fun = emqttd_parser:new(PktOpts)}.
+

+ 46 - 86
src/emqttd_ws_client.erl

@@ -16,42 +16,31 @@
 
 -module(emqttd_ws_client).
 
+-behaviour(gen_server).
+
 -include("emqttd.hrl").
 
 -include("emqttd_protocol.hrl").
 
 %% API Exports
--export([start_link/1, ws_loop/3, session/1, info/1, kick/1]).
+-export([start_link/4, session/1, info/1, kick/1]).
 
 %% SUB/UNSUB Asynchronously
 -export([subscribe/2, unsubscribe/2]).
 
--behaviour(gen_server).
-
 %% gen_server Function Exports
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
          terminate/2, code_change/3]).
 
-%% WebSocket Loop State
--record(wsocket_state, {request, client_pid, packet_opts, parser_fun}).
-
 %% WebSocket Client State
--record(wsclient_state, {ws_pid, request, proto_state, keepalive}).
-
--define(WSLOG(Level, Format, Args, Req),
-              lager:Level("WsClient(~s): " ++ Format, [Req:get(peer) | Args])).
-
-%% @doc Start WebSocket client.
-start_link(Req) ->
-    PktOpts = emqttd:env(mqtt, packet),
-    ParserFun = emqttd_parser:new(PktOpts),
-    {ReentryWs, ReplyChannel} = upgrade(Req),
-    Params = [self(), Req, ReplyChannel, PktOpts],
-    {ok, ClientPid} = gen_server:start_link(?MODULE, Params, []),
-    ReentryWs(#wsocket_state{request     = Req,
-                             client_pid  = ClientPid,
-                             packet_opts = PktOpts,
-                             parser_fun  = ParserFun}).
+-record(wsclient_state, {ws_pid, peer, connection, proto_state, keepalive}).
+
+-define(WSLOG(Level, Peer, Format, Args),
+              lager:Level("WsClient(~s): " ++ Format, [Peer | Args])).
+
+%% @doc Start WebSocket Client.
+start_link(MqttEnv, WsPid, Req, ReplyChannel) ->
+    gen_server:start_link(?MODULE, [MqttEnv, WsPid, Req, ReplyChannel], []).
 
 session(CPid) ->
     gen_server:call(CPid, session, infinity).
@@ -68,66 +57,40 @@ subscribe(CPid, TopicTable) ->
 unsubscribe(CPid, Topics) ->
     gen_server:cast(CPid, {unsubscribe, Topics}).
 
-%% @private
-%% @doc Upgrade WebSocket.
-upgrade(Req) ->
-    mochiweb_websocket:upgrade_connection(Req, fun ?MODULE:ws_loop/3).
-
-%% @doc WebSocket frame receive loop.
-ws_loop(<<>>, State, _ReplyChannel) ->
-    State;
-ws_loop([<<>>], State, _ReplyChannel) ->
-    State;
-ws_loop(Data, State = #wsocket_state{request    = Req,
-                                     client_pid = ClientPid,
-                                     parser_fun = ParserFun}, ReplyChannel) ->
-    ?WSLOG(debug, "RECV ~p", [Data], Req),
-    case catch ParserFun(iolist_to_binary(Data)) of
-        {more, NewParser} ->
-            State#wsocket_state{parser_fun = NewParser};
-        {ok, Packet, Rest} ->
-            gen_server:cast(ClientPid, {received, Packet}),
-            ws_loop(Rest, reset_parser(State), ReplyChannel);
-        {error, Error} ->
-            ?WSLOG(error, "Frame error: ~p", [Error], Req),
-            exit({shutdown, Error});
-        {'EXIT', Reason} ->
-            ?WSLOG(error, "Frame error: ~p", [Reason], Req),
-            ?WSLOG(error, "Error data: ~p", [Data], Req),
-            exit({shutdown, parser_error})
-    end.
-
-reset_parser(State = #wsocket_state{packet_opts = PktOpts}) ->
-    State#wsocket_state{parser_fun = emqttd_parser:new(PktOpts)}.
-
 %%--------------------------------------------------------------------
-%% gen_server callbacks
+%% gen_server Callbacks
 %%--------------------------------------------------------------------
 
-init([WsPid, Req, ReplyChannel, PktOpts]) ->
-    %%issue#413: trap_exit is unnecessary
-    %%process_flag(trap_exit, true),
+init([MqttEnv, WsPid, Req, ReplyChannel]) ->
+    true = link(WsPid),
     {ok, Peername} = Req:get(peername),
+    Headers = mochiweb_headers:to_list(
+                mochiweb_request:get(headers, Req)),
+    PktOpts = proplists:get_value(packet, MqttEnv),
     SendFun = fun(Payload) -> ReplyChannel({binary, Payload}) end,
-    Headers = mochiweb_request:get(headers, Req),
-    HeadersList = mochiweb_headers:to_list(Headers),
     ProtoState = emqttd_protocol:init(Peername, SendFun,
-                                      [{ws_initial_headers, HeadersList} | PktOpts]),
-    {ok, #wsclient_state{ws_pid = WsPid, request = Req, proto_state = ProtoState}}.
+                                      [{ws_initial_headers, Headers} | PktOpts]),
+    {ok, #wsclient_state{ws_pid = WsPid, peer = Req:get(peer),
+                         connection = Req:get(connection),
+                         proto_state = ProtoState}, idle_timeout(MqttEnv)}.
+
+idle_timeout(MqttEnv) ->
+    ClientOpts = proplists:get_value(client, MqttEnv),
+    timer:seconds(proplists:get_value(idle_timeout, ClientOpts, 10)).
 
 handle_call(session, _From, State = #wsclient_state{proto_state = ProtoState}) ->
     {reply, emqttd_protocol:session(ProtoState), State};
 
-handle_call(info, _From, State = #wsclient_state{request     = Req,
+handle_call(info, _From, State = #wsclient_state{peer = Peer,
                                                  proto_state = ProtoState}) ->
     ProtoInfo = emqttd_protocol:info(ProtoState),
-    {reply, [{websocket, true}, {peer, Req:get(peer)}| ProtoInfo], State};
+    {reply, [{websocket, true}, {peer, Peer}| ProtoInfo], State};
 
 handle_call(kick, _From, State) ->
     {stop, {shutdown, kick}, ok, State};
 
-handle_call(Req, _From, State = #wsclient_state{request = HttpReq}) ->
-    ?WSLOG(critical, "Unexpected request: ~p", [Req], HttpReq),
+handle_call(Req, _From, State = #wsclient_state{peer = Peer}) ->
+    ?WSLOG(critical, Peer, "Unexpected request: ~p", [Req]),
     {reply, {error, unsupported_request}, State}.
 
 handle_cast({subscribe, TopicTable}, State) ->
@@ -140,13 +103,12 @@ handle_cast({unsubscribe, Topics}, State) ->
                    emqttd_session:unsubscribe(SessPid, Topics)
                  end, State);
 
-handle_cast({received, Packet}, State = #wsclient_state{request     = Req,
-                                                        proto_state = ProtoState}) ->
+handle_cast({received, Packet}, State = #wsclient_state{peer = Peer, proto_state = ProtoState}) ->
     case emqttd_protocol:received(Packet, ProtoState) of
         {ok, ProtoState1} ->
             noreply(State#wsclient_state{proto_state = ProtoState1});
         {error, Error} ->
-            ?WSLOG(error, "Protocol error - ~p", [Error], Req),
+            ?WSLOG(error, Peer, "Protocol error - ~p", [Error]),
             shutdown(Error, State);
         {error, Error, ProtoState1} ->
             shutdown(Error, State#wsclient_state{proto_state = ProtoState1});
@@ -154,9 +116,12 @@ handle_cast({received, Packet}, State = #wsclient_state{request     = Req,
             stop(Reason, State#wsclient_state{proto_state = ProtoState1})
     end;
 
-handle_cast(Msg, State = #wsclient_state{request = Req}) ->
-    ?WSLOG(critical, "Unexpected msg: ~p", [Msg], Req),
-    {noreply, State}.
+handle_cast(Msg, State = #wsclient_state{peer = Peer}) ->
+    ?WSLOG(critical, Peer, "Unexpected msg: ~p", [Msg]),
+    noreply(State).
+
+handle_info(timeout, State) ->
+    shutdown(idle_timeout, State);
 
 handle_info({suback, PacketId, GrantedQos}, State) ->
     with_proto_state(fun(ProtoState) ->
@@ -174,13 +139,12 @@ handle_info({redeliver, {?PUBREL, PacketId}}, State) ->
                        emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState)
                      end, State);
 
-handle_info({shutdown, conflict, {ClientId, NewPid}}, State = #wsclient_state{request = Req}) ->
-    ?WSLOG(warning, "clientid '~s' conflict with ~p", [ClientId, NewPid], Req),
+handle_info({shutdown, conflict, {ClientId, NewPid}}, State = #wsclient_state{peer = Peer}) ->
+    ?WSLOG(warning, Peer, "clientid '~s' conflict with ~p", [ClientId, NewPid]),
     shutdown(conflict, State);
 
-handle_info({keepalive, start, Interval}, State = #wsclient_state{request = Req}) ->
-    ?WSLOG(debug, "Keepalive at the interval of ~p", [Interval], Req),
-    Conn = Req:get(connection),
+handle_info({keepalive, start, Interval}, State = #wsclient_state{peer = Peer, connection = Conn}) ->
+    ?WSLOG(debug, Peer, "Keepalive at the interval of ~p", [Interval]),
     StatFun = fun() ->
         case Conn:getstat([recv_oct]) of
             {ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct};
@@ -190,25 +154,21 @@ handle_info({keepalive, start, Interval}, State = #wsclient_state{request = Req}
     KeepAlive = emqttd_keepalive:start(StatFun, Interval, {keepalive, check}),
     noreply(State#wsclient_state{keepalive = KeepAlive});
 
-handle_info({keepalive, check}, State = #wsclient_state{request   = Req,
+handle_info({keepalive, check}, State = #wsclient_state{peer      = Peer,
                                                         keepalive = KeepAlive}) ->
     case emqttd_keepalive:check(KeepAlive) of
         {ok, KeepAlive1} ->
             noreply(State#wsclient_state{keepalive = KeepAlive1});
         {error, timeout} ->
-            ?WSLOG(debug, "Keepalive Timeout!", [], Req),
+            ?WSLOG(debug, Peer, "Keepalive Timeout!", []),
             shutdown(keepalive_timeout, State);
         {error, Error} ->
-            ?WSLOG(warning, "Keepalive error - ~p", [Error], Req),
+            ?WSLOG(warning, Peer, "Keepalive error - ~p", [Error]),
             shutdown(keepalive_error, State)
     end;
 
-%%issue#413: removed the trap_exit flag
-%%handle_info({'EXIT', WsPid, Reason}, State = #wsclient_state{ws_pid = WsPid}) ->
-%%    stop(Reason, State);
-
-handle_info(Info, State = #wsclient_state{request = Req}) ->
-    ?WSLOG(critical, "Unexpected Info: ~p", [Info], Req),
+handle_info(Info, State = #wsclient_state{peer = Peer}) ->
+    ?WSLOG(critical, Peer, "Unexpected Info: ~p", [Info]),
     noreply(State).
 
 terminate(Reason, #wsclient_state{proto_state = ProtoState, keepalive = KeepAlive}) ->

+ 45 - 0
src/emqttd_ws_client_sup.erl

@@ -0,0 +1,45 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqttd_ws_client_sup).
+
+-author("Feng Lee <feng@emqtt.io>").
+
+-behavior(supervisor).
+
+-export([start_link/0, start_client/3]).
+
+-export([init/1]).
+
+%% @doc Start websocket client supervisor
+-spec(start_link() -> {ok, pid()}).
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, [emqttd:env(mqtt)]).
+
+%% @doc Start a WebSocket Client
+-spec(start_client(pid(), mochiweb_request:request(), fun()) -> {ok, pid()}).
+start_client(WsPid, Req, ReplyChannel) ->
+    supervisor:start_child(?MODULE, [WsPid, Req, ReplyChannel]).
+
+%%--------------------------------------------------------------------
+%% Supervisor callbacks
+%%--------------------------------------------------------------------
+
+init([Env]) ->
+    {ok, {{simple_one_for_one, 0, 1},
+           [{ws_client, {emqttd_ws_client, start_link, [Env]},
+             temporary, 5000, worker, [emqttd_ws_client]}]}}.
+

+ 0 - 140
test/esockd_access.erl

@@ -1,140 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
--module(esockd_access).
-
--type cidr() :: string().
-
--type rule() :: {allow, all} |
-                {allow, cidr()} |
-                {deny,  all} |
-                {deny,  cidr()}.
-
--type range() :: {cidr(), pos_integer(), pos_integer()}.
-
--export_type([cidr/0, range/0, rule/0]).
-
--type range_rule() :: {allow, all} |
-                      {allow, range()} |
-                      {deny,  all} |
-                      {deny,  range()}.
-
--export([rule/1, match/2, range/1,  mask/1, atoi/1, itoa/1]).
-
-%%------------------------------------------------------------------------------
-%% @doc
-%% Build CIDR, Make rule.
-%%
-%% @end
-%%------------------------------------------------------------------------------
--spec rule(rule()) -> range_rule().
-rule({allow, all}) ->
-    {allow, all};
-rule({allow, CIDR}) when is_list(CIDR) ->
-    rule(allow, CIDR);
-rule({deny, CIDR}) when is_list(CIDR) ->
-    rule(deny, CIDR);
-rule({deny, all}) ->
-    {deny, all}.
-
-rule(Type, CIDR) when is_list(CIDR) ->
-    {Start, End} = range(CIDR), {Type, {CIDR, Start, End}}.
-
-%%------------------------------------------------------------------------------
-%% @doc
-%% Match Addr with Access Rules.
-%%
-%% @end
-%%------------------------------------------------------------------------------
--spec match(inet:ip_address(), [range_rule()]) -> {matched, allow} | {matched, deny} | nomatch.
-match(Addr, Rules) when is_tuple(Addr) ->
-    match2(atoi(Addr), Rules).
-
-match2(_I, []) ->
-    nomatch;
-match2(_I, [{allow, all}|_]) ->
-    {matched, allow};
-match2(I, [{allow, {_, Start, End}}|_]) when I >= Start, I =< End ->
-    {matched, allow};
-match2(I, [{allow, {_, _Start, _End}}|Rules]) ->
-    match2(I, Rules);
-match2(I, [{deny, {_, Start, End}}|_]) when I >= Start, I =< End ->
-    {matched, deny};
-match2(I, [{deny, {_, _Start, _End}}|Rules]) ->
-    match2(I, Rules);
-match2(_I, [{deny, all}|_]) ->
-    {matched, deny}.
-
-%%------------------------------------------------------------------------------
-%% @doc
-%% CIDR range.
-%%
-%% @end
-%%------------------------------------------------------------------------------
--spec range(cidr()) -> {pos_integer(), pos_integer()}.
-range(CIDR) ->
-    case string:tokens(CIDR, "/") of
-        [Addr] ->
-            {ok, IP} = inet:getaddr(Addr, inet),
-            {atoi(IP), atoi(IP)};
-        [Addr, Mask] ->
-            {ok, IP} = inet:getaddr(Addr, inet),
-            {Start, End} = subnet(IP, mask(list_to_integer(Mask))),
-            {Start, End}
-    end.
-
-subnet(IP, Mask) ->
-    Start = atoi(IP) band Mask,
-    End = Start bor (Mask bxor 16#FFFFFFFF),
-    {Start, End}.
-
-%%------------------------------------------------------------------------------
-%% @doc
-%% Mask Int
-%%
-%% @end
-%%------------------------------------------------------------------------------
--spec mask(0..32) -> 0..16#FFFFFFFF.
-mask(0) ->
-    16#00000000;
-mask(32) ->
-    16#FFFFFFFF;
-mask(N) when N >= 1, N =< 31 ->
-    lists:foldl(fun(I, Mask) -> (1 bsl I) bor Mask end, 0, lists:seq(32 - N, 31)).
-
-%%------------------------------------------------------------------------------
-%% @doc
-%% Addr to Integer.
-%%
-%% @end
-%%------------------------------------------------------------------------------
-atoi({A, B, C, D}) ->
-    (A bsl 24) + (B bsl 16) + (C bsl 8) + D.
-
-%%------------------------------------------------------------------------------
-%% @doc
-%% Integer to Addr.
-%%
-%% @end
-%%------------------------------------------------------------------------------
-itoa(I) ->
-    A = (I bsr 24) band 16#FF,
-    B = (I bsr 16) band 16#FF,
-    C = (I bsr 8)  band 16#FF,
-    D = I band 16#FF,
-    {A, B, C, D}.
-
-