|
|
@@ -232,6 +232,7 @@ handle_info({inet_async, _Sock, _Ref, {ok, Data}}, State) ->
|
|
|
Size = iolist_size(Data),
|
|
|
emqx_metrics:inc('bytes/received', Size),
|
|
|
Incoming = #{bytes => Size, packets => 0},
|
|
|
+ put(last_packet_ts, erlang:system_time(millisecond)),
|
|
|
handle_packet(Data, State#state{await_recv = false, incoming = Incoming});
|
|
|
|
|
|
handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) ->
|
|
|
@@ -290,7 +291,6 @@ code_change(_OldVsn, State, _Extra) ->
|
|
|
|
|
|
%% Receive and parse data
|
|
|
handle_packet(<<>>, State) ->
|
|
|
- put(last_packet_ts, erlang:system_time(millisecond)),
|
|
|
{noreply, maybe_gc(ensure_stats_timer(ensure_rate_limit(State)))};
|
|
|
|
|
|
handle_packet(Data, State = #state{proto_state = ProtoState,
|
|
|
@@ -298,7 +298,6 @@ handle_packet(Data, State = #state{proto_state = ProtoState,
|
|
|
idle_timeout = IdleTimeout}) ->
|
|
|
case catch emqx_frame:parse(Data, ParserState) of
|
|
|
{more, NewParserState} ->
|
|
|
- put(last_packet_ts, erlang:system_time(millisecond)),
|
|
|
{noreply, run_socket(State#state{parser_state = NewParserState}), IdleTimeout};
|
|
|
{ok, Packet = ?PACKET(Type), Rest} ->
|
|
|
emqx_metrics:received(Packet),
|