Feng Lee 10 年 前
コミット
afa0c1819b
1 ファイル変更33 行追加25 行削除
  1. 33 25
      apps/emqttd/src/emqttd_ws_client.erl

+ 33 - 25
apps/emqttd/src/emqttd_ws_client.erl

@@ -49,7 +49,10 @@
                         parser_state}).
 
 %% Client state
--record(state, {ws_pid, request, proto_state, keepalive}).
+-record(client_state, {ws_pid,
+                       request,
+                       proto_state,
+                       keepalive}).
 
 %%------------------------------------------------------------------------------
 %% @doc Start WebSocket client.
@@ -109,80 +112,85 @@ init([WsPid, Req, ReplyChannel, PktOpts]) ->
     {ok, Peername} = emqttd_net:peername(Socket),
     SendFun = fun(Payload) -> ReplyChannel({binary, Payload}) end,
     ProtoState = emqttd_protocol:init(Peername, SendFun, PktOpts),
-    {ok, #state{ws_pid = WsPid, request = Req, proto_state = ProtoState}}.
+    {ok, #client_state{ws_pid = WsPid, request = Req, proto_state = ProtoState}}.
 
 handle_call(_Req, _From, State) ->
     {reply, error, State}.
 
-handle_cast({received, Packet}, State = #state{proto_state = ProtoState}) ->
+handle_cast({received, Packet}, State = #client_state{proto_state = ProtoState}) ->
     case emqttd_protocol:received(Packet, ProtoState) of
     {ok, ProtoState1} ->
-        {noreply, State#state{proto_state = ProtoState1}};
+        {noreply, State#client_state{proto_state = ProtoState1}};
     {error, Error} ->
         lager:error("MQTT protocol error ~p", [Error]),
         stop({shutdown, Error}, State);
     {error, Error, ProtoState1} ->
-        stop({shutdown, Error}, State#state{proto_state = ProtoState1});
+        stop({shutdown, Error}, State#client_state{proto_state = ProtoState1});
     {stop, Reason, ProtoState1} ->
-        stop(Reason, State#state{proto_state = ProtoState1})
+        stop(Reason, State#client_state{proto_state = ProtoState1})
     end;
 
 handle_cast(_Msg, State) ->
     {noreply, State}.
 
-handle_info({dispatch, {From, Messages}}, #state{proto_state = ProtoState} = State) when is_list(Messages) ->
+handle_info({dispatch, {From, Messages}}, #client_state{proto_state = ProtoState} = State)
+    when is_list(Messages) ->
     ProtoState1 =
     lists:foldl(fun(Message, PState) ->
             {ok, PState1} = emqttd_protocol:send({From, Message}, PState), PState1
         end, ProtoState, Messages),
-    {noreply, State#state{proto_state = ProtoState1}};
+    {noreply, State#client_state{proto_state = ProtoState1}};
 
-handle_info({dispatch, {From, Message}}, #state{proto_state = ProtoState} = State) ->
+handle_info({dispatch, {From, Message}}, #client_state{proto_state = ProtoState} = State) ->
     {ok, ProtoState1} = emqttd_protocol:send({From, Message}, ProtoState),
-    {noreply, State#state{proto_state = ProtoState1}};
+    {noreply, State#client_state{proto_state = ProtoState1}};
 
-handle_info({redeliver, {?PUBREL, PacketId}}, #state{proto_state = ProtoState} = State) ->
+handle_info({redeliver, {?PUBREL, PacketId}}, #client_state{proto_state = ProtoState} = State) ->
     {ok, ProtoState1} = emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState),
-    {noreply, State#state{proto_state = ProtoState1}};
+    {noreply, State#client_state{proto_state = ProtoState1}};
 
-handle_info({subscribe, Topic, Qos}, #state{proto_state = ProtoState} = State) ->
+handle_info({subscribe, Topic, Qos}, #client_state{proto_state = ProtoState} = State) ->
     {ok, ProtoState1} = emqttd_protocol:handle({subscribe, Topic, Qos}, ProtoState),
-    {noreply, State#state{proto_state = ProtoState1}};
+    {noreply, State#client_state{proto_state = ProtoState1}};
 
-handle_info({stop, duplicate_id, _NewPid}, State=#state{proto_state = ProtoState}) ->
+handle_info({stop, duplicate_id, _NewPid}, State=#client_state{proto_state = ProtoState}) ->
     lager:error("Shutdown for duplicate clientid: ~s", [emqttd_protocol:clientid(ProtoState)]), 
     stop({shutdown, duplicate_id}, State);
 
-handle_info({keepalive, start, TimeoutSec}, State = #state{request = Req}) ->
+handle_info({keepalive, start, TimeoutSec}, State = #client_state{request = Req}) ->
     lager:debug("Client(WebSocket) ~s: Start KeepAlive with ~p seconds", [Req:get(peer), TimeoutSec]),
     %%TODO: fix esockd_transport...
     KeepAlive = emqttd_keepalive:new({esockd_transport, Req:get(socket)},
                                      TimeoutSec, {keepalive, timeout}),
-    {noreply, State#state{keepalive = KeepAlive}};
+    {noreply, State#client_state{keepalive = KeepAlive}};
 
-handle_info({keepalive, timeout}, State = #state{request = Req, keepalive = KeepAlive}) ->
+handle_info({keepalive, timeout}, State = #client_state{request = Req, keepalive = KeepAlive}) ->
     case emqttd_keepalive:resume(KeepAlive) of
     timeout ->
         lager:debug("Client(WebSocket) ~s: Keepalive Timeout!", [Req:get(peer)]),
-        stop({shutdown, keepalive_timeout}, State#state{keepalive = undefined});
+        stop({shutdown, keepalive_timeout}, State#client_state{keepalive = undefined});
     {resumed, KeepAlive1} ->
         lager:debug("Client(WebSocket) ~s: Keepalive Resumed", [Req:get(peer)]),
-        {noreply, State#state{keepalive = KeepAlive1}}
+        {noreply, State#client_state{keepalive = KeepAlive1}}
     end;
 
-handle_info({'EXIT', WsPid, Reason}, State = #state{ws_pid = WsPid}) ->
-    stop(Reason, State);
+handle_info({'EXIT', WsPid, Reason}, State = #client_state{ws_pid = WsPid, proto_state = ProtoState}) ->
+    ClientId = emqttd_protocol:clientid(ProtoState),
+    lager:warning("Websocket client ~s exit: reason=~p", [ClientId, Reason]),
+    stop({shutdown, websocket_closed}, State);
 
-handle_info(Info, State = #state{request = Req}) ->
+handle_info(Info, State = #client_state{request = Req}) ->
     lager:critical("Client(WebSocket) ~s: Unexpected Info - ~p", [Req:get(peer), Info]),
     {noreply, State}.
 
-terminate(Reason, #state{proto_state = ProtoState, keepalive = KeepAlive}) ->
+terminate(Reason, #client_state{proto_state = ProtoState, keepalive = KeepAlive}) ->
+    lager:info("WebSocket client terminated: ~p", [Reason]),
     emqttd_keepalive:cancel(KeepAlive),
     case Reason of
         {shutdown, Error} ->
             emqttd_protocol:shutdown(Error, ProtoState);
-        _ -> ok
+        _ ->
+            emqttd_protocol:shutdown(Reason, ProtoState)
     end.
 
 code_change(_OldVsn, State, _Extra) ->