|
@@ -152,12 +152,11 @@ send_fun(WsPid) ->
|
|
|
WsPid ! {binary, iolist_to_binary(Data)}
|
|
WsPid ! {binary, iolist_to_binary(Data)}
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
-stat_fun() ->
|
|
|
|
|
- fun() -> {ok, get(recv_oct)} end.
|
|
|
|
|
-
|
|
|
|
|
websocket_handle({binary, <<>>}, State) ->
|
|
websocket_handle({binary, <<>>}, State) ->
|
|
|
|
|
+ put(last_packet_ts, erlang:system_time(millisecond)),
|
|
|
{ok, ensure_stats_timer(State)};
|
|
{ok, ensure_stats_timer(State)};
|
|
|
websocket_handle({binary, [<<>>]}, State) ->
|
|
websocket_handle({binary, [<<>>]}, State) ->
|
|
|
|
|
+ put(last_packet_ts, erlang:system_time(millisecond)),
|
|
|
{ok, ensure_stats_timer(State)};
|
|
{ok, ensure_stats_timer(State)};
|
|
|
websocket_handle({binary, Data}, State = #state{parser_state = ParserState,
|
|
websocket_handle({binary, Data}, State = #state{parser_state = ParserState,
|
|
|
proto_state = ProtoState}) ->
|
|
proto_state = ProtoState}) ->
|
|
@@ -167,6 +166,7 @@ websocket_handle({binary, Data}, State = #state{parser_state = ParserState,
|
|
|
emqx_metrics:inc('bytes/received', BinSize),
|
|
emqx_metrics:inc('bytes/received', BinSize),
|
|
|
case catch emqx_frame:parse(iolist_to_binary(Data), ParserState) of
|
|
case catch emqx_frame:parse(iolist_to_binary(Data), ParserState) of
|
|
|
{more, NewParserState} ->
|
|
{more, NewParserState} ->
|
|
|
|
|
+ put(last_packet_ts, erlang:system_time(millisecond)),
|
|
|
{ok, State#state{parser_state = NewParserState}};
|
|
{ok, State#state{parser_state = NewParserState}};
|
|
|
{ok, Packet, Rest} ->
|
|
{ok, Packet, Rest} ->
|
|
|
emqx_metrics:received(Packet),
|
|
emqx_metrics:received(Packet),
|
|
@@ -225,7 +225,7 @@ websocket_info({timeout, Timer, emit_stats},
|
|
|
|
|
|
|
|
websocket_info({keepalive, start, Interval}, State) ->
|
|
websocket_info({keepalive, start, Interval}, State) ->
|
|
|
?WSLOG(debug, "Keepalive at the interval of ~p", [Interval], State),
|
|
?WSLOG(debug, "Keepalive at the interval of ~p", [Interval], State),
|
|
|
- case emqx_keepalive:start(stat_fun(), Interval, {keepalive, check}) of
|
|
|
|
|
|
|
+ case emqx_keepalive:start(Interval, {keepalive, check}) of
|
|
|
{ok, KeepAlive} ->
|
|
{ok, KeepAlive} ->
|
|
|
{ok, State#state{keepalive = KeepAlive}};
|
|
{ok, State#state{keepalive = KeepAlive}};
|
|
|
{error, Error} ->
|
|
{error, Error} ->
|
|
@@ -234,7 +234,7 @@ websocket_info({keepalive, start, Interval}, State) ->
|
|
|
end;
|
|
end;
|
|
|
|
|
|
|
|
websocket_info({keepalive, check}, State = #state{keepalive = KeepAlive}) ->
|
|
websocket_info({keepalive, check}, State = #state{keepalive = KeepAlive}) ->
|
|
|
- case emqx_keepalive:check(KeepAlive) of
|
|
|
|
|
|
|
+ case emqx_keepalive:check(KeepAlive, get(last_packet_ts)) of
|
|
|
{ok, KeepAlive1} ->
|
|
{ok, KeepAlive1} ->
|
|
|
{ok, State#state{keepalive = KeepAlive1}};
|
|
{ok, State#state{keepalive = KeepAlive1}};
|
|
|
{error, timeout} ->
|
|
{error, timeout} ->
|