Explorar o código

fix: keep alive check

According to MQTT spec, MQTT Server should check a complete MQTT message
recv in last keep-alive time frame instead of number of received bytes
from the socket.

This commit change to check the recv pkt counter from process dict
instead.

Also it could save some calls to erlang port.
William Yang %!s(int64=3) %!d(string=hai) anos
pai
achega
01b9115fd8

+ 3 - 8
apps/emqx/src/emqx_connection.erl

@@ -708,8 +708,6 @@ handle_timeout(
     TRef,
     TRef,
     keepalive,
     keepalive,
     State = #state{
     State = #state{
-        transport = Transport,
-        socket = Socket,
         channel = Channel
         channel = Channel
     }
     }
 ) ->
 ) ->
@@ -717,12 +715,9 @@ handle_timeout(
         disconnected ->
         disconnected ->
             {ok, State};
             {ok, State};
         _ ->
         _ ->
-            case Transport:getstat(Socket, [recv_oct]) of
-                {ok, [{recv_oct, RecvOct}]} ->
-                    handle_timeout(TRef, {keepalive, RecvOct}, State);
-                {error, Reason} ->
-                    handle_info({sock_error, Reason}, State)
-            end
+            %% recv_pkt: valid MQTT message
+            RecvCnt = emqx_pd:get_counter(recv_pkt),
+            handle_timeout(TRef, {keepalive, RecvCnt}, State)
     end;
     end;
 handle_timeout(TRef, Msg, State) ->
 handle_timeout(TRef, Msg, State) ->
     with_channel(handle_timeout, [TRef, Msg], State).
     with_channel(handle_timeout, [TRef, Msg], State).

+ 0 - 5
apps/emqx/test/emqx_connection_SUITE.erl

@@ -316,11 +316,6 @@ t_handle_timeout(_) ->
         emqx_connection:handle_timeout(TRef, keepalive, State)
         emqx_connection:handle_timeout(TRef, keepalive, State)
     ),
     ),
 
 
-    ok = meck:expect(emqx_transport, getstat, fun(_Sock, _Options) -> {error, for_testing} end),
-    ?assertMatch(
-        {stop, {shutdown, for_testing}, _NState},
-        emqx_connection:handle_timeout(TRef, keepalive, State)
-    ),
     ?assertMatch({ok, _NState}, emqx_connection:handle_timeout(TRef, undefined, State)).
     ?assertMatch({ok, _NState}, emqx_connection:handle_timeout(TRef, undefined, State)).
 
 
 t_parse_incoming(_) ->
 t_parse_incoming(_) ->