|
@@ -134,7 +134,6 @@ websocket_init(#state{request = Req, options = Options}) ->
|
|
|
Zone = proplists:get_value(zone, Options),
|
|
Zone = proplists:get_value(zone, Options),
|
|
|
EnableStats = emqx_zone:get_env(Zone, enable_stats, true),
|
|
EnableStats = emqx_zone:get_env(Zone, enable_stats, true),
|
|
|
IdleTimout = emqx_zone:get_env(Zone, idle_timeout, 30000),
|
|
IdleTimout = emqx_zone:get_env(Zone, idle_timeout, 30000),
|
|
|
- lists:foreach(fun(Stat) -> put(Stat, 0) end, ?SOCK_STATS),
|
|
|
|
|
|
|
|
|
|
emqx_logger:set_metadata_peername(esockd_net:format(Peername)),
|
|
emqx_logger:set_metadata_peername(esockd_net:format(Peername)),
|
|
|
{ok, #state{peername = Peername,
|
|
{ok, #state{peername = Peername,
|
|
@@ -149,14 +148,14 @@ send_fun(WsPid) ->
|
|
|
Data = emqx_frame:serialize(Packet, Options),
|
|
Data = emqx_frame:serialize(Packet, Options),
|
|
|
BinSize = iolist_size(Data),
|
|
BinSize = iolist_size(Data),
|
|
|
emqx_metrics:trans(inc, 'bytes/sent', BinSize),
|
|
emqx_metrics:trans(inc, 'bytes/sent', BinSize),
|
|
|
- put(send_oct, get(send_oct) + BinSize),
|
|
|
|
|
- put(send_cnt, get(send_cnt) + 1),
|
|
|
|
|
|
|
+ emqx_pd:update_counter(send_cnt, 1),
|
|
|
|
|
+ emqx_pd:update_counter(send_oct, BinSize),
|
|
|
WsPid ! {binary, iolist_to_binary(Data)},
|
|
WsPid ! {binary, iolist_to_binary(Data)},
|
|
|
ok
|
|
ok
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
stat_fun() ->
|
|
stat_fun() ->
|
|
|
- fun() -> {ok, get(recv_oct)} end.
|
|
|
|
|
|
|
+ fun() -> {ok, emqx_pd:get_counter(recv_oct)} end.
|
|
|
|
|
|
|
|
websocket_handle({binary, <<>>}, State) ->
|
|
websocket_handle({binary, <<>>}, State) ->
|
|
|
{ok, ensure_stats_timer(State)};
|
|
{ok, ensure_stats_timer(State)};
|
|
@@ -164,16 +163,16 @@ websocket_handle({binary, [<<>>]}, State) ->
|
|
|
{ok, ensure_stats_timer(State)};
|
|
{ok, ensure_stats_timer(State)};
|
|
|
websocket_handle({binary, Data}, State = #state{parser_state = ParserState,
|
|
websocket_handle({binary, Data}, State = #state{parser_state = ParserState,
|
|
|
proto_state = ProtoState}) ->
|
|
proto_state = ProtoState}) ->
|
|
|
- BinSize = iolist_size(Data),
|
|
|
|
|
- put(recv_oct, get(recv_oct) + BinSize),
|
|
|
|
|
?LOG(debug, "RECV ~p", [Data]),
|
|
?LOG(debug, "RECV ~p", [Data]),
|
|
|
|
|
+ BinSize = iolist_size(Data),
|
|
|
|
|
+ emqx_pd:update_counter(recv_oct, BinSize),
|
|
|
emqx_metrics:trans(inc, 'bytes/received', BinSize),
|
|
emqx_metrics:trans(inc, 'bytes/received', BinSize),
|
|
|
- case catch emqx_frame:parse(iolist_to_binary(Data), ParserState) of
|
|
|
|
|
- {more, NewParserState} ->
|
|
|
|
|
- {ok, State#state{parser_state = NewParserState}};
|
|
|
|
|
|
|
+ try emqx_frame:parse(iolist_to_binary(Data), ParserState) of
|
|
|
|
|
+ {more, ParserState1} ->
|
|
|
|
|
+ {ok, State#state{parser_state = ParserState1}};
|
|
|
{ok, Packet, Rest} ->
|
|
{ok, Packet, Rest} ->
|
|
|
emqx_metrics:received(Packet),
|
|
emqx_metrics:received(Packet),
|
|
|
- put(recv_cnt, get(recv_cnt) + 1),
|
|
|
|
|
|
|
+ emqx_pd:update_counter(recv_cnt, 1),
|
|
|
case emqx_protocol:received(Packet, ProtoState) of
|
|
case emqx_protocol:received(Packet, ProtoState) of
|
|
|
{ok, ProtoState1} ->
|
|
{ok, ProtoState1} ->
|
|
|
websocket_handle({binary, Rest}, reset_parser(State#state{proto_state = ProtoState1}));
|
|
websocket_handle({binary, Rest}, reset_parser(State#state{proto_state = ProtoState1}));
|
|
@@ -187,9 +186,10 @@ websocket_handle({binary, Data}, State = #state{parser_state = ParserState,
|
|
|
end;
|
|
end;
|
|
|
{error, Error} ->
|
|
{error, Error} ->
|
|
|
?LOG(error, "Frame error: ~p", [Error]),
|
|
?LOG(error, "Frame error: ~p", [Error]),
|
|
|
- shutdown(Error, State);
|
|
|
|
|
- {'EXIT', Reason} ->
|
|
|
|
|
- ?LOG(error, "Frame error:~p~nFrame data: ~p", [Reason, Data]),
|
|
|
|
|
|
|
+ shutdown(Error, State)
|
|
|
|
|
+ catch
|
|
|
|
|
+ _:Error ->
|
|
|
|
|
+ ?LOG(error, "Frame error:~p~nFrame data: ~p", [Error, Data]),
|
|
|
shutdown(parse_error, State)
|
|
shutdown(parse_error, State)
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
@@ -300,4 +300,5 @@ shutdown(Reason, State) ->
|
|
|
{stop, State#state{shutdown = Reason}}.
|
|
{stop, State#state{shutdown = Reason}}.
|
|
|
|
|
|
|
|
wsock_stats() ->
|
|
wsock_stats() ->
|
|
|
- [{Key, get(Key)} || Key <- ?SOCK_STATS].
|
|
|
|
|
|
|
+ [{Key, emqx_pd:get_counter(Key)} || Key <- ?SOCK_STATS].
|
|
|
|
|
+
|