|
|
@@ -14,8 +14,11 @@
|
|
|
|
|
|
-module(emqx_ws_connection).
|
|
|
|
|
|
+-define(LOG_HEADER, "[WS]").
|
|
|
+
|
|
|
-include("emqx.hrl").
|
|
|
-include("emqx_mqtt.hrl").
|
|
|
+-include("logger.hrl").
|
|
|
|
|
|
-export([info/1, attrs/1]).
|
|
|
-export([stats/1]).
|
|
|
@@ -45,12 +48,6 @@
|
|
|
|
|
|
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]).
|
|
|
|
|
|
--define(WSLOG(Level, Format, Args),
|
|
|
- emqx_logger:Level(#{header => "[WS] ", format => Format, args => Args},
|
|
|
- #{report_cb =>
|
|
|
- fun(#{header := Hdr0, format := Fmt0, args := Args0}) ->
|
|
|
- {Hdr0 ++ Fmt0, Args0}
|
|
|
- end})).
|
|
|
%%------------------------------------------------------------------------------
|
|
|
%% API
|
|
|
%%------------------------------------------------------------------------------
|
|
|
@@ -169,7 +166,7 @@ websocket_handle({binary, Data}, State = #state{parser_state = ParserState,
|
|
|
proto_state = ProtoState}) ->
|
|
|
BinSize = iolist_size(Data),
|
|
|
put(recv_oct, get(recv_oct) + BinSize),
|
|
|
- ?WSLOG(debug, "RECV ~p", [Data]),
|
|
|
+ ?LOG(debug, "RECV ~p", [Data]),
|
|
|
emqx_metrics:inc('bytes/received', BinSize),
|
|
|
case catch emqx_frame:parse(iolist_to_binary(Data), ParserState) of
|
|
|
{more, NewParserState} ->
|
|
|
@@ -181,7 +178,7 @@ websocket_handle({binary, Data}, State = #state{parser_state = ParserState,
|
|
|
{ok, ProtoState1} ->
|
|
|
websocket_handle({binary, Rest}, reset_parser(State#state{proto_state = ProtoState1}));
|
|
|
{error, Error} ->
|
|
|
- ?WSLOG(error, "Protocol error - ~p", [Error]),
|
|
|
+ ?LOG(error, "Protocol error - ~p", [Error]),
|
|
|
stop(Error, State);
|
|
|
{error, Reason, ProtoState1} ->
|
|
|
shutdown(Reason, State#state{proto_state = ProtoState1});
|
|
|
@@ -189,10 +186,10 @@ websocket_handle({binary, Data}, State = #state{parser_state = ParserState,
|
|
|
stop(Error, State#state{proto_state = ProtoState1})
|
|
|
end;
|
|
|
{error, Error} ->
|
|
|
- ?WSLOG(error, "Frame error: ~p", [Error]),
|
|
|
+ ?LOG(error, "Frame error: ~p", [Error]),
|
|
|
stop(Error, State);
|
|
|
{'EXIT', Reason} ->
|
|
|
- ?WSLOG(error, "Frame error:~p~nFrame data: ~p", [Reason, Data]),
|
|
|
+ ?LOG(error, "Frame error:~p~nFrame data: ~p", [Reason, Data]),
|
|
|
shutdown(parse_error, State)
|
|
|
end.
|
|
|
|
|
|
@@ -230,12 +227,12 @@ websocket_info({timeout, Timer, emit_stats},
|
|
|
{ok, State#state{stats_timer = undefined}, hibernate};
|
|
|
|
|
|
websocket_info({keepalive, start, Interval}, State) ->
|
|
|
- ?WSLOG(debug, "Keepalive at the interval of ~p", [Interval]),
|
|
|
+ ?LOG(debug, "Keepalive at the interval of ~p", [Interval]),
|
|
|
case emqx_keepalive:start(stat_fun(), Interval, {keepalive, check}) of
|
|
|
{ok, KeepAlive} ->
|
|
|
{ok, State#state{keepalive = KeepAlive}};
|
|
|
{error, Error} ->
|
|
|
- ?WSLOG(warning, "Keepalive error - ~p", [Error]),
|
|
|
+ ?LOG(warning, "Keepalive error - ~p", [Error]),
|
|
|
shutdown(Error, State)
|
|
|
end;
|
|
|
|
|
|
@@ -244,19 +241,19 @@ websocket_info({keepalive, check}, State = #state{keepalive = KeepAlive}) ->
|
|
|
{ok, KeepAlive1} ->
|
|
|
{ok, State#state{keepalive = KeepAlive1}};
|
|
|
{error, timeout} ->
|
|
|
- ?WSLOG(debug, "Keepalive Timeout!", []),
|
|
|
+ ?LOG(debug, "Keepalive Timeout!", []),
|
|
|
shutdown(keepalive_timeout, State);
|
|
|
{error, Error} ->
|
|
|
- ?WSLOG(warning, "Keepalive error - ~p", [Error]),
|
|
|
+ ?LOG(warning, "Keepalive error - ~p", [Error]),
|
|
|
shutdown(keepalive_error, State)
|
|
|
end;
|
|
|
|
|
|
websocket_info({shutdown, discard, {ClientId, ByPid}}, State) ->
|
|
|
- ?WSLOG(warning, "discarded by ~s:~p", [ClientId, ByPid]),
|
|
|
+ ?LOG(warning, "discarded by ~s:~p", [ClientId, ByPid]),
|
|
|
shutdown(discard, State);
|
|
|
|
|
|
websocket_info({shutdown, conflict, {ClientId, NewPid}}, State) ->
|
|
|
- ?WSLOG(warning, "clientid '~s' conflict with ~p", [ClientId, NewPid]),
|
|
|
+ ?LOG(warning, "clientid '~s' conflict with ~p", [ClientId, NewPid]),
|
|
|
shutdown(conflict, State);
|
|
|
|
|
|
websocket_info({binary, Data}, State) ->
|
|
|
@@ -266,13 +263,13 @@ websocket_info({shutdown, Reason}, State) ->
|
|
|
shutdown(Reason, State);
|
|
|
|
|
|
websocket_info(Info, State) ->
|
|
|
- ?WSLOG(error, "unexpected info: ~p", [Info]),
|
|
|
+ ?LOG(error, "unexpected info: ~p", [Info]),
|
|
|
{ok, State}.
|
|
|
|
|
|
terminate(SockError, _Req, #state{keepalive = Keepalive,
|
|
|
proto_state = ProtoState,
|
|
|
shutdown = Shutdown}) ->
|
|
|
- ?WSLOG(debug, "Terminated for ~p, sockerror: ~p",
|
|
|
+ ?LOG(debug, "Terminated for ~p, sockerror: ~p",
|
|
|
[Shutdown, SockError]),
|
|
|
emqx_keepalive:cancel(Keepalive),
|
|
|
case {ProtoState, Shutdown} of
|