Przeglądaj źródła

websocket support

Feng Lee 11 lat temu
rodzic
commit
69611b234d

+ 2 - 1
apps/emqttd/priv/www/mqtt.html

@@ -20,7 +20,7 @@
         }
         }
 
 
         // Create a client instance
         // Create a client instance
-        client = new Paho.MQTT.Client(location.hostname, Number(location.port), "/mqtt/wsocket", "clientId");
+        client = new Paho.MQTT.Client(location.hostname, Number(location.port), "/mqtt", "clientId");
 
 
         // set callback handlers
         // set callback handlers
         client.onConnectionLost = onConnectionLost;
         client.onConnectionLost = onConnectionLost;
@@ -34,6 +34,7 @@
         // called when the client connects
         // called when the client connects
         function onConnect() {
         function onConnect() {
               alert("connected"),
               alert("connected"),
+              $('connstate').innerHTML = 'CONNECTED';
               // Once a connection has been made, make a subscription and send a message.
               // Once a connection has been made, make a subscription and send a message.
               console.log("onConnect");
               console.log("onConnect");
               client.subscribe("/World");
               client.subscribe("/World");

+ 1 - 1
apps/emqttd/src/emqttd.erl

@@ -71,7 +71,7 @@ open_listener({mqtts, Port, Options}) ->
 
 
 %% open http port
 %% open http port
 open_listener({http, Port, Options}) ->
 open_listener({http, Port, Options}) ->
-    MFArgs = {emqttd_http, handle_req, []},
+    MFArgs = {emqttd_http, handle_request, []},
 	mochiweb:start_http(Port, Options, MFArgs).
 	mochiweb:start_http(Port, Options, MFArgs).
 
 
 open_listener(Protocol, Port, Options) ->
 open_listener(Protocol, Port, Options) ->

+ 3 - 2
apps/emqttd/src/emqttd_client.erl

@@ -67,8 +67,9 @@ init([SockArgs = {Transport, Sock, _SockFun}, PacketOpts]) ->
     {ok, Peername} = emqttd_net:peername(Sock),
     {ok, Peername} = emqttd_net:peername(Sock),
     {ok, ConnStr} = emqttd_net:connection_string(Sock, inbound),
     {ok, ConnStr} = emqttd_net:connection_string(Sock, inbound),
     lager:info("Connect from ~s", [ConnStr]),
     lager:info("Connect from ~s", [ConnStr]),
+    SendFun = fun(Data) -> Transport:send(NewSock, Data) end,
     ParserState = emqtt_parser:init(PacketOpts),
     ParserState = emqtt_parser:init(PacketOpts),
-    ProtoState = emqttd_protocol:init({Transport, NewSock, Peername}, PacketOpts),
+    ProtoState = emqttd_protocol:init(Peername, SendFun, PacketOpts),
     State = control_throttle(#state{transport    = Transport,
     State = control_throttle(#state{transport    = Transport,
                                     socket       = NewSock,
                                     socket       = NewSock,
                                     peername     = Peername,
                                     peername     = Peername,
@@ -200,7 +201,7 @@ process_received_bytes(Bytes, State = #state{packet_opts = PacketOpts,
             stop(Reason, State#state{proto_state = ProtoState1})
             stop(Reason, State#state{proto_state = ProtoState1})
         end;
         end;
     {error, Error} ->
     {error, Error} ->
-        lager:error("MQTT detected framing error ~p for connection ~p~n", [ConnStr, Error]),
+        lager:error("MQTT detected framing error ~p for connection ~p", [Error, ConnStr]),
         stop({shutdown, Error}, State)
         stop({shutdown, Error}, State)
     end.
     end.
 
 

+ 31 - 13
apps/emqttd/src/emqttd_http.erl

@@ -34,12 +34,15 @@
 
 
 -import(proplists, [get_value/2, get_value/3]).
 -import(proplists, [get_value/2, get_value/3]).
 
 
--export([handle_req/1]).
+-export([handle_request/1]).
 
 
-handle_req(Req) ->
-    handle_req(Req:get(method), Req:get(path), Req).
+handle_request(Req) ->
+    handle_request(Req:get(method), Req:get(path), Req).
 
 
-handle_req('POST', "/mqtt/publish", Req) ->
+%%------------------------------------------------------------------------------
+%% HTTP Publish API
+%%------------------------------------------------------------------------------
+handle_request('POST', "/mqtt/publish", Req) ->
     Params = mochiweb_request:parse_post(Req),
     Params = mochiweb_request:parse_post(Req),
     lager:info("HTTP Publish: ~p", [Params]),
     lager:info("HTTP Publish: ~p", [Params]),
 	case authorized(Req) of
 	case authorized(Req) of
@@ -64,21 +67,33 @@ handle_req('POST', "/mqtt/publish", Req) ->
 		Req:respond({401, [], <<"Fobbiden">>})
 		Req:respond({401, [], <<"Fobbiden">>})
 	end;
 	end;
 
 
-handle_req(_Method, "/mqtt/wsocket", Req) ->
+%%------------------------------------------------------------------------------
+%% MQTT Over WebSocket
+%%------------------------------------------------------------------------------
+handle_request('GET', "/mqtt", Req) ->
     lager:info("Websocket Connection from: ~s", [Req:get(peer)]),
     lager:info("Websocket Connection from: ~s", [Req:get(peer)]),
-    Up = Req:get_header_value("Upgrade"),
-    case Up =/= undefined andalso string:to_lower(Up) =:= "websocket" of
-        true ->
-            emqttd_websocket:start_link(Req);
-        false ->
-            Req:respond({400, [], <<"Bad Request">>})
+    Upgrade = Req:get_header_value("Upgrade"),
+    Proto = Req:get_header_value("Sec-WebSocket-Protocol"),
+    case {is_websocket(Upgrade), Proto} of
+        {true, "mqtt" ++ _Vsn} ->
+            emqttd_ws_client:start_link(Req);
+        {false, _} ->
+            lager:error("Not WebSocket: Upgrade = ~s", [Upgrade]),
+            Req:respond({400, [], <<"Bad Request">>});
+        {_, Proto} ->
+            lager:error("WebSocket with error Protocol: ~s", [Proto]),
+            Req:respond({400, [], <<"Bad WebSocket Protocol">>})
     end;
     end;
 
 
-handle_req('GET', "/" ++ File, Req) ->
+%%------------------------------------------------------------------------------
+%% Get static files
+%%------------------------------------------------------------------------------
+handle_request('GET', "/" ++ File, Req) ->
     lager:info("HTTP GET File: ~s", [File]),
     lager:info("HTTP GET File: ~s", [File]),
     mochiweb_request:serve_file(File, docroot(), Req);
     mochiweb_request:serve_file(File, docroot(), Req);
 
 
-handle_req(_Method, _Path, Req) ->
+handle_request(Method, Path, Req) ->
+    lager:error("Unexpected HTTP Request: ~s ~s", [Method, Path]),
 	Req:not_found().
 	Req:not_found().
 
 
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
@@ -113,6 +128,9 @@ int(S) -> list_to_integer(S).
 bool("0") -> false;
 bool("0") -> false;
 bool("1") -> true.
 bool("1") -> true.
 
 
+is_websocket(Upgrade) -> 
+    Upgrade =/= undefined andalso string:to_lower(Upgrade) =:= "websocket".
+
 docroot() ->
 docroot() ->
     {file, Here} = code:is_loaded(?MODULE),
     {file, Here} = code:is_loaded(?MODULE),
     Dir = filename:dirname(filename:dirname(Here)),
     Dir = filename:dirname(filename:dirname(Here)),

+ 8 - 18
apps/emqttd/src/emqttd_protocol.erl

@@ -34,7 +34,7 @@
 -include("emqttd.hrl").
 -include("emqttd.hrl").
 
 
 %% API
 %% API
--export([init/2, clientid/1]).
+-export([init/3, clientid/1]).
 
 
 -export([received/2, send/2, redeliver/2, shutdown/2]).
 -export([received/2, send/2, redeliver/2, shutdown/2]).
 
 
@@ -42,9 +42,8 @@
 
 
 %% Protocol State
 %% Protocol State
 -record(proto_state, {
 -record(proto_state, {
-        transport,
-        socket,
         peername,
         peername,
+        sendfun,
         connected = false, %received CONNECT action?
         connected = false, %received CONNECT action?
         proto_ver,
         proto_ver,
         proto_name,
         proto_name,
@@ -59,12 +58,12 @@
 
 
 -type proto_state() :: #proto_state{}.
 -type proto_state() :: #proto_state{}.
 
 
-init({Transport, Socket, Peername}, Opts) ->
+init(Peername, SendFun, Opts) ->
+    MaxLen = proplists:get_value(max_clientid_len, Opts, ?MAX_CLIENTID_LEN),
 	#proto_state{
 	#proto_state{
-        transport        = Transport,
-		socket	         = Socket,
         peername         = Peername,
         peername         = Peername,
-        max_clientid_len = proplists:get_value(max_clientid_len, Opts, ?MAX_CLIENTID_LEN)}. 
+        sendfun          = SendFun,
+        max_clientid_len = MaxLen}. 
 
 
 clientid(#proto_state{clientid = ClientId}) -> ClientId.
 clientid(#proto_state{clientid = ClientId}) -> ClientId.
 
 
@@ -231,22 +230,13 @@ send({_From, Message = #mqtt_message{qos = Qos}}, State = #proto_state{session =
     {Message1, NewSession} = emqttd_session:store(Session, Message),
     {Message1, NewSession} = emqttd_session:store(Session, Message),
 	send(emqtt_message:to_packet(Message1), State#proto_state{session = NewSession});
 	send(emqtt_message:to_packet(Message1), State#proto_state{session = NewSession});
 
 
-send(Packet, State = #proto_state{transport = Transport, socket = Sock, peername = Peername}) when is_record(Packet, mqtt_packet) ->
+send(Packet, State = #proto_state{sendfun = SendFun, peername = Peername}) when is_record(Packet, mqtt_packet) ->
     trace(send, Packet, State),
     trace(send, Packet, State),
     sent_stats(Packet),
     sent_stats(Packet),
     Data = emqtt_serialiser:serialise(Packet),
     Data = emqtt_serialiser:serialise(Packet),
     lager:debug("SENT to ~s: ~p", [emqttd_net:format(Peername), Data]),
     lager:debug("SENT to ~s: ~p", [emqttd_net:format(Peername), Data]),
     emqttd_metrics:inc('bytes/sent', size(Data)),
     emqttd_metrics:inc('bytes/sent', size(Data)),
-    if
-        is_function(Transport) -> 
-            io:format("Transport Fun: ~p~n", [Transport]),
-            try  
-                Transport(Data)
-            catch
-                _:Error -> io:format("Transport send error: ~p~n", [Error])
-            end;
-        true -> Transport:send(Sock, Data)
-    end,
+    SendFun(Data),
     {ok, State}.
     {ok, State}.
 
 
 trace(recv, Packet, #proto_state{peername = Peername, clientid = ClientId}) ->
 trace(recv, Packet, #proto_state{peername = Peername, clientid = ClientId}) ->

+ 0 - 171
apps/emqttd/src/emqttd_websocket.erl

@@ -1,171 +0,0 @@
-%%%-----------------------------------------------------------------------------
-%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
-%%%
-%%% Permission is hereby granted, free of charge, to any person obtaining a copy
-%%% of this software and associated documentation files (the "Software"), to deal
-%%% in the Software without restriction, including without limitation the rights
-%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-%%% copies of the Software, and to permit persons to whom the Software is
-%%% furnished to do so, subject to the following conditions:
-%%%
-%%% The above copyright notice and this permission notice shall be included in all
-%%% copies or substantial portions of the Software.
-%%%
-%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
-%%% SOFTWARE.
-%%%-----------------------------------------------------------------------------
-%%% @doc
-%%% emqttd websocket client.
-%%%
-%%% @end
-%%%-----------------------------------------------------------------------------
-
--module(emqttd_websocket).
-
--author("Feng Lee <feng@emqtt.io>").
-
--include_lib("emqtt/include/emqtt.hrl").
-
--include_lib("emqtt/include/emqtt_packet.hrl").
-
--behaviour(gen_server).
-
--define(SERVER, ?MODULE).
-
--export([start_link/1, ws_loop/3]).
-
-%% gen_server Function Exports
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
-         terminate/2, code_change/3]).
-
--record(wsocket_state, {client, parser_state}).
-
--record(client_state, {request, reply_channel, proto_state, keepalive}).
-
--define(PACKET_OPTS, [{max_clientid_len, 1024},
-                      {max_packet_size,  4096}]).
-
-start_link(Req) ->
-    {ReentryWs, ReplyChannel} = mochiweb_websocket:upgrade_connection(Req, fun ?MODULE:ws_loop/3),
-    {ok, Client} = gen_server:start_link(?MODULE, [Req, ReplyChannel], []),
-    ReentryWs(#wsocket_state{client = Client,
-                             parser_state = emqtt_parser:init(?PACKET_OPTS)}).
-
-ws_loop(<<>>, State, _ReplyChannel) ->
-    State;
-ws_loop(Payload, State = #wsocket_state{client = Client, parser_state = ParserState}, ReplyChannel) ->
-    io:format("Received data: ~p~n", [Payload]),
-    case catch emqtt_parser:parse(iolist_to_binary(Payload), ParserState) of
-    {more, ParserState1} ->
-        State#wsocket_state{parser_state = ParserState1};
-    {ok, Packet, Rest} ->
-        Client ! {received, Packet},
-        ws_loop(Rest, State#wsocket_state{parser_state = emqtt_parser:init(?PACKET_OPTS)}, ReplyChannel);
-    {error, Error} ->
-        lager:error("MQTT detected framing error ~p~n", [Error]),
-        exit({shutdown, Error});
-    Exit ->
-        lager:error("MQTT detected error ~p~n", [Exit])
-    end.
-
-%%%=============================================================================
-%%% gen_server callbacks
-%%%=============================================================================
-
-init([Req, ReplyChannel]) ->
-    %%TODO: Redesign later...
-    Socket = Req:get(socket),
-    {ok, Peername} = emqttd_net:peername(Socket),
-    SendFun = fun(Payload) -> ReplyChannel({binary, Payload}) end,
-    ProtoState = emqttd_protocol:init({SendFun, Socket, Peername}, ?PACKET_OPTS),
-    {ok, #client_state{request = Req, reply_channel = ReplyChannel,
-                       proto_state = ProtoState}}.
-
-handle_call(_Req, _From, State) ->
-    {reply, error, State}.
-
-handle_cast(_Msg, State) ->
-    {noreply, State}.
-
-handle_info({received, Packet}, State = #client_state{proto_state = ProtoState}) ->
-    io:format("Packet Received: ~p~n", [Packet]),
-    case emqttd_protocol:received(Packet, ProtoState) of
-    {ok, ProtoState1} ->
-        {noreply, State#client_state{proto_state = ProtoState1}};
-    {error, Error} ->
-        lager:error("MQTT protocol error ~p", [Error]),
-        stop({shutdown, Error}, State);
-    {error, Error, ProtoState1} ->
-        stop({shutdown, Error}, State#client_state{proto_state = ProtoState1});
-    {stop, Reason, ProtoState1} ->
-        stop(Reason, State#client_state{proto_state = ProtoState1})
-    end;
-
-%%TODO: ok??
-handle_info({dispatch, {From, Messages}}, #client_state{proto_state = ProtoState} = State) when is_list(Messages) ->
-    ProtoState1 =
-    lists:foldl(fun(Message, PState) ->
-            {ok, PState1} = emqttd_protocol:send({From, Message}, PState), PState1
-        end, ProtoState, Messages),
-    {noreply, State#client_state{proto_state = ProtoState1}};
-
-handle_info({dispatch, {From, Message}}, #client_state{proto_state = ProtoState} = State) ->
-    {ok, ProtoState1} = emqttd_protocol:send({From, Message}, ProtoState),
-    {noreply, State#client_state{proto_state = ProtoState1}};
-
-handle_info({redeliver, {?PUBREL, PacketId}}, #client_state{proto_state = ProtoState} = State) ->
-    {ok, ProtoState1} = emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState),
-    {noreply, State#client_state{proto_state = ProtoState1}};
-
-handle_info({stop, duplicate_id, _NewPid}, State=#client_state{proto_state = ProtoState}) ->
-    %% TODO: to...
-    %% need transfer data???
-    %% emqttd_client:transfer(NewPid, Data),
-    lager:error("Shutdown for duplicate clientid: ~s",
-                [emqttd_protocol:clientid(ProtoState)]), 
-    stop({shutdown, duplicate_id}, State);
-
-handle_info({keepalive, start, _TimeoutSec}, State = #client_state{}) ->
-    %lager:debug("Client ~s: Start KeepAlive with ~p seconds", [emqttd_net:format(Peername), TimeoutSec]),
-    KeepAlive = undefined, %emqttd_keepalive:new({Transport, Socket}, TimeoutSec, {keepalive, timeout}),
-    {noreply, State#client_state{ keepalive = KeepAlive }};
-
-handle_info({keepalive, timeout}, State = #client_state{keepalive = KeepAlive}) ->
-    case emqttd_keepalive:resume(KeepAlive) of
-    timeout ->
-        %lager:debug("Client ~s: Keepalive Timeout!", [emqttd_net:format(Peername)]),
-        stop({shutdown, keepalive_timeout}, State#client_state{keepalive = undefined});
-    {resumed, KeepAlive1} ->
-        %lager:debug("Client ~s: Keepalive Resumed", [emqttd_net:format(Peername)]),
-        {noreply, State#client_state{keepalive = KeepAlive1}}
-    end;
-
-handle_info(_Info, State) ->
-    {noreply, State}.
-
-terminate(Reason, #client_state{keepalive = KeepAlive, proto_state = ProtoState}) ->
-    lager:debug("~p terminated, reason: ~p~n", [self(), Reason]),
-    emqttd_keepalive:cancel(KeepAlive),
-    case {ProtoState, Reason} of
-        {undefined, _} -> ok;
-        {_, {shutdown, Error}} -> 
-            emqttd_protocol:shutdown(Error, ProtoState);
-        {_, _} -> 
-            ok
-    end.
-
-code_change(_OldVsn, State, _Extra) ->
-    {ok, State}.
-
-%%%=============================================================================
-%%% Internal functions
-%%%=============================================================================
-
-stop(Reason, State ) ->
-    {stop, Reason, State}.
-

+ 192 - 0
apps/emqttd/src/emqttd_ws_client.erl

@@ -0,0 +1,192 @@
+%%%-----------------------------------------------------------------------------
+%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
+%%%
+%%% Permission is hereby granted, free of charge, to any person obtaining a copy
+%%% of this software and associated documentation files (the "Software"), to deal
+%%% in the Software without restriction, including without limitation the rights
+%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+%%% copies of the Software, and to permit persons to whom the Software is
+%%% furnished to do so, subject to the following conditions:
+%%%
+%%% The above copyright notice and this permission notice shall be included in all
+%%% copies or substantial portions of the Software.
+%%%
+%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+%%% SOFTWARE.
+%%%-----------------------------------------------------------------------------
+%%% @doc
+%%% emqttd websocket client.
+%%%
+%%% @end
+%%%-----------------------------------------------------------------------------
+
+-module(emqttd_ws_client).
+
+-author("Feng Lee <feng@emqtt.io>").
+
+-include_lib("emqtt/include/emqtt.hrl").
+
+-include_lib("emqtt/include/emqtt_packet.hrl").
+
+-behaviour(gen_server).
+
+%% API Exports
+-export([start_link/1, ws_loop/3]).
+
+%% gen_server Function Exports
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+         terminate/2, code_change/3]).
+
+%% WebSocket loop state
+-record(wsocket_state, {request,
+                        client_pid,
+                        packet_opts,
+                        parser_state}).
+
+%% Client state
+-record(state, {ws_pid, request, proto_state, keepalive}).
+
+%%------------------------------------------------------------------------------
+%% @doc Start WebSocket client.
+%% @end
+%%------------------------------------------------------------------------------
+start_link(Req) ->
+    {ReentryWs, ReplyChannel} = upgrade(Req),
+    {ok, PktOpts} = application:get_env(emqttd, mqtt_packet),
+    {ok, ClientPid} = gen_server:start_link(?MODULE, [self(), Req, ReplyChannel, PktOpts], []),
+    ReentryWs(#wsocket_state{request      = Req,
+                             client_pid   = ClientPid,
+                             packet_opts  = PktOpts,
+                             parser_state = emqtt_parser:init(PktOpts)}).
+
+%%------------------------------------------------------------------------------
+%% @private
+%% @doc Start WebSocket client.
+%% @end
+%%------------------------------------------------------------------------------
+upgrade(Req) ->
+    mochiweb_websocket:upgrade_connection(Req, fun ?MODULE:ws_loop/3).
+
+%%------------------------------------------------------------------------------
+%% @doc WebSocket frame receive loop.
+%% @end
+%%------------------------------------------------------------------------------
+ws_loop(<<>>, State, _ReplyChannel) ->
+    State;
+ws_loop([<<>>], State, _ReplyChannel) ->
+    State;
+ws_loop(Data, State = #wsocket_state{request = Req,
+                                     client_pid = ClientPid,
+                                     parser_state = ParserState}, ReplyChannel) ->
+    Peer = Req:get(peer),
+    lager:debug("RECV from ~s(WebSocket): ~p", [Peer, Data]),
+    case emqtt_parser:parse(iolist_to_binary(Data), ParserState) of
+    {more, ParserState1} ->
+        State#wsocket_state{parser_state = ParserState1};
+    {ok, Packet, Rest} ->
+        gen_server:cast(ClientPid, {received, Packet}),
+        ws_loop(Rest, reset_parser(State), ReplyChannel);
+    {error, Error} ->
+        lager:error("MQTT(WebSocket) detected framing error ~p for connection ~s", [Error, Peer]),
+        exit({shutdown, Error})
+    end.
+
+reset_parser(State = #wsocket_state{packet_opts  = PktOpts}) ->
+    State#wsocket_state{parser_state = emqtt_parser:init(PktOpts)}.
+
+%%%=============================================================================
+%%% gen_fsm callbacks
+%%%=============================================================================
+
+init([WsPid, Req, ReplyChannel, PktOpts]) ->
+    process_flag(trap_exit, true),
+    Socket = Req:get(socket),
+    {ok, Peername} = emqttd_net:peername(Socket),
+    SendFun = fun(Payload) -> ReplyChannel({binary, Payload}) end,
+    ProtoState = emqttd_protocol:init(Peername, SendFun, PktOpts),
+    {ok, #state{ws_pid = WsPid, request = Req, proto_state = ProtoState}}.
+
+handle_call(_Req, _From, State) ->
+    {reply, error, State}.
+
+handle_cast({received, Packet}, State = #state{proto_state = ProtoState}) ->
+    case emqttd_protocol:received(Packet, ProtoState) of
+    {ok, ProtoState1} ->
+        {noreply, State#state{proto_state = ProtoState1}};
+    {error, Error} ->
+        lager:error("MQTT protocol error ~p", [Error]),
+        stop({shutdown, Error}, State);
+    {error, Error, ProtoState1} ->
+        stop({shutdown, Error}, State#state{proto_state = ProtoState1});
+    {stop, Reason, ProtoState1} ->
+        stop(Reason, State#state{proto_state = ProtoState1})
+    end;
+
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+handle_info({dispatch, {From, Messages}}, #state{proto_state = ProtoState} = State) when is_list(Messages) ->
+    ProtoState1 =
+    lists:foldl(fun(Message, PState) ->
+            {ok, PState1} = emqttd_protocol:send({From, Message}, PState), PState1
+        end, ProtoState, Messages),
+    {noreply, State#state{proto_state = ProtoState1}};
+
+handle_info({dispatch, {From, Message}}, #state{proto_state = ProtoState} = State) ->
+    {ok, ProtoState1} = emqttd_protocol:send({From, Message}, ProtoState),
+    {noreply, State#state{proto_state = ProtoState1}};
+
+handle_info({redeliver, {?PUBREL, PacketId}}, #state{proto_state = ProtoState} = State) ->
+    {ok, ProtoState1} = emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState),
+    {noreply, State#state{proto_state = ProtoState1}};
+
+handle_info({stop, duplicate_id, _NewPid}, State=#state{proto_state = ProtoState}) ->
+    lager:error("Shutdown for duplicate clientid: ~s", [emqttd_protocol:clientid(ProtoState)]), 
+    stop({shutdown, duplicate_id}, State);
+
+handle_info({keepalive, start, TimeoutSec}, State = #state{request = Req}) ->
+    lager:debug("Client(WebSocket) ~s: Start KeepAlive with ~p seconds", [Req:get(peer), TimeoutSec]),
+    %%TODO: fix esockd_transport...
+    KeepAlive = emqttd_keepalive:new({esockd_transport, Req:get(socket)},
+                                     TimeoutSec, {keepalive, timeout}),
+    {noreply, State#state{keepalive = KeepAlive}};
+
+handle_info({keepalive, timeout}, State = #state{request = Req, keepalive = KeepAlive}) ->
+    case emqttd_keepalive:resume(KeepAlive) of
+    timeout ->
+        lager:debug("Client(WebSocket) ~s: Keepalive Timeout!", [Req:get(peer)]),
+        stop({shutdown, keepalive_timeout}, State#state{keepalive = undefined});
+    {resumed, KeepAlive1} ->
+        lager:debug("Client(WebSocket) ~s: Keepalive Resumed", [Req:get(peer)]),
+        {noreply, State#state{keepalive = KeepAlive1}}
+    end;
+
+handle_info({'EXIT', WsPid, Reason}, State = #state{ws_pid = WsPid}) ->
+    stop(Reason, State);
+
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+terminate(Reason, #state{proto_state = ProtoState, keepalive = KeepAlive}) ->
+    emqttd_keepalive:cancel(KeepAlive),
+    case Reason of
+        {shutdown, Error} ->
+            emqttd_protocol:shutdown(Error, ProtoState);
+        _ -> ok
+    end.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+%%%=============================================================================
+%%% Internal functions
+%%%=============================================================================
+
+stop(Reason, State ) ->
+    {stop, Reason, State}.
+