Przeglądaj źródła

Use the new emqttd_parser API to parse Websocket frame

Feng Lee 9 lat temu
rodzic
commit
88c2b4eaa3
1 zmienionych plików z 24 dodań i 24 usunięć
  1. 24 24
      src/emqttd_ws.erl

+ 24 - 24
src/emqttd_ws.erl

@@ -18,13 +18,18 @@
 
 -author("Feng Lee <feng@emqtt.io>").
 
+-include("emqttd_protocol.hrl").
+
+-import(proplists, [get_value/3]).
+
 -export([handle_request/1, ws_loop/3]).
 
 %% WebSocket Loop State
--record(wsocket_state, {peer, client_pid, packet_opts, parser_fun}).
+-record(wsocket_state, {peername, client_pid, max_packet_size, parser}).
 
--define(WSLOG(Level, Peer, Format, Args),
-        lager:Level("WsClient(~s): " ++ Format, [Peer | Args])).
+-define(WSLOG(Level, Format, Args, State),
+              lager:Level("WsClient(~s): " ++ Format,
+                          [esockd_net:format(State#wsocket_state.peername) | Args])).
 
 %%--------------------------------------------------------------------
 %% Handle WebSocket Request
@@ -32,18 +37,14 @@
 
 %% @doc Handle WebSocket Request.
 handle_request(Req) ->
-    Peer = Req:get(peer),
-    {ok, PktOpts} = emqttd:env(protocol),
-    ParserFun = emqttd_parser:new(PktOpts),
-    {ReentryWs, ReplyChannel} = upgrade(Req),
+    {ok, Env} = emqttd:env(protocol),
+    PacketSize = get_value(max_packet_size, Env, ?MAX_PACKET_SIZE),
+    Parser = emqttd_parser:initial_state(PacketSize),
+    %% Upgrade WebSocket.
+    {ReentryWs, ReplyChannel} = mochiweb_websocket:upgrade_connection(Req, fun ?MODULE:ws_loop/3),
     {ok, ClientPid} = emqttd_ws_client_sup:start_client(self(), Req, ReplyChannel),
-    ReentryWs(#wsocket_state{peer = Peer, client_pid = ClientPid,
-                             packet_opts = PktOpts, parser_fun = ParserFun}).
-
-%% @doc Upgrade WebSocket.
-%% @private
-upgrade(Req) ->
-    mochiweb_websocket:upgrade_connection(Req, fun ?MODULE:ws_loop/3).
+    ReentryWs(#wsocket_state{peername = Req:get(peername), parser = Parser,
+                             max_packet_size = PacketSize, client_pid = ClientPid}).
 
 %%--------------------------------------------------------------------
 %% Receive Loop
@@ -54,25 +55,24 @@ ws_loop(<<>>, State, _ReplyChannel) ->
     State;
 ws_loop([<<>>], State, _ReplyChannel) ->
     State;
-ws_loop(Data, State = #wsocket_state{peer = Peer, client_pid = ClientPid,
-                                     parser_fun = ParserFun}, ReplyChannel) ->
-    ?WSLOG(debug, Peer, "RECV ~p", [Data]),
+ws_loop(Data, State = #wsocket_state{client_pid = ClientPid, parser = Parser}, ReplyChannel) ->
+    ?WSLOG(debug, "RECV ~p", [Data], State),
     emqttd_metrics:inc('bytes/received', iolist_size(Data)),
-    case catch ParserFun(iolist_to_binary(Data)) of
+    case catch emqttd_parser:parse(iolist_to_binary(Data), Parser) of
         {more, NewParser} ->
-            State#wsocket_state{parser_fun = NewParser};
+            State#wsocket_state{parser = NewParser};
         {ok, Packet, Rest} ->
             gen_server:cast(ClientPid, {received, Packet}),
             ws_loop(Rest, reset_parser(State), ReplyChannel);
         {error, Error} ->
-            ?WSLOG(error, Peer, "Frame error: ~p", [Error]),
+            ?WSLOG(error, "Frame error: ~p", [Error], State),
             exit({shutdown, Error});
         {'EXIT', Reason} ->
-            ?WSLOG(error, Peer, "Frame error: ~p", [Reason]),
-            ?WSLOG(error, Peer, "Error data: ~p", [Data]),
+            ?WSLOG(error, "Frame error: ~p", [Reason], State),
+            ?WSLOG(error, "Error data: ~p", [Data], State),
             exit({shutdown, parser_error})
     end.
 
-reset_parser(State = #wsocket_state{packet_opts = PktOpts}) ->
-    State#wsocket_state{parser_fun = emqttd_parser:new(PktOpts)}.
+reset_parser(State = #wsocket_state{max_packet_size = PacketSize}) ->
+    State#wsocket_state{parser = emqttd_parser:initial_state(PacketSize)}.