Browse Source

Revert "Calculate the 1.5 keep alive time exactly"

turtleDeng 7 years ago
parent
commit
8653732bae

+ 1 - 1
Makefile

@@ -37,7 +37,7 @@ EUNIT_OPTS = verbose
 
 CT_SUITES = emqx emqx_zone emqx_banned emqx_connection emqx_session emqx_access emqx_broker emqx_cm emqx_frame emqx_guid emqx_inflight \
 			emqx_json emqx_keepalive emqx_lib emqx_metrics emqx_misc emqx_mod emqx_mqtt_caps \
-			emqx_mqtt_compat emqx_mqtt_props emqx_mqueue emqx_pqueue emqx_router emqx_sm \
+			emqx_mqtt_compat emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \
 			emqx_stats emqx_tables emqx_time emqx_topic emqx_trie emqx_vm \
 		 	emqx_mountpoint emqx_listeners emqx_protocol emqx_pool
 

+ 2 - 2
etc/emqx.conf

@@ -597,10 +597,10 @@ zone.external.force_gc_policy = 1000|1MB
 ## zone.external.server_keepalive = 0
 
 ## The backoff for MQTT keepalive timeout. The broker will kick a connection out
-## until 'Keepalive * backoff' timeout.
+## until 'Keepalive * backoff * 2' timeout.
 ##
 ## Value: Float > 0.5
-zone.external.keepalive_backoff = 1.5
+zone.external.keepalive_backoff = 0.75
 
 ## Maximum number of subscriptions allowed, 0 means no limit.
 ##

+ 1 - 1
priv/emqx.schema

@@ -748,7 +748,7 @@ end}.
 
 %% @doc Keepalive backoff
 {mapping, "zone.$name.keepalive_backoff", "emqx.zones", [
-  {default, 1.5},
+  {default, 0.75},
   {datatype, float}
 ]}.
 

+ 9 - 4
src/emqx_connection.erl

@@ -249,7 +249,6 @@ handle_info({inet_async, _Sock, _Ref, {ok, Data}}, State) ->
     Size = iolist_size(Data),
     emqx_metrics:inc('bytes/received', Size),
     Incoming = #{bytes => Size, packets => 0},
-    put(last_packet_ts, erlang:system_time(millisecond)),
     handle_packet(Data, State#state{await_recv = false, incoming = Incoming});
 
 handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) ->
@@ -261,9 +260,15 @@ handle_info({inet_reply, _Sock, ok}, State) ->
 handle_info({inet_reply, _Sock, {error, Reason}}, State) ->
     shutdown(Reason, State);
 
-handle_info({keepalive, start, Interval}, State) ->
+handle_info({keepalive, start, Interval}, State = #state{transport = Transport, socket = Socket}) ->
     ?LOG(debug, "Keepalive at the interval of ~p", [Interval], State),
-    case emqx_keepalive:start(Interval, {keepalive, check}) of
+    StatFun = fun() ->
+                case Transport:getstat(Socket, [recv_oct]) of
+                    {ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct};
+                    Error                       -> Error
+                end
+             end,
+    case emqx_keepalive:start(StatFun, Interval, {keepalive, check}) of
         {ok, KeepAlive} ->
             {noreply, State#state{keepalive = KeepAlive}};
         {error, Error} ->
@@ -271,7 +276,7 @@ handle_info({keepalive, start, Interval}, State) ->
     end;
 
 handle_info({keepalive, check}, State = #state{keepalive = KeepAlive}) ->
-    case emqx_keepalive:check(KeepAlive, get(last_packet_ts)) of
+    case emqx_keepalive:check(KeepAlive) of
         {ok, KeepAlive1} ->
             {noreply, State#state{keepalive = KeepAlive1}};
         {error, timeout} ->

+ 35 - 21
src/emqx_keepalive.erl

@@ -14,37 +14,51 @@
 
 -module(emqx_keepalive).
 
--export([start/2, check/2, cancel/1]).
+-export([start/3, check/1, cancel/1]).
 
--record(keepalive, {tmsec, tmsg, tref}).
+-record(keepalive, {statfun, statval, tsec, tmsg, tref, repeat = 0}).
 
 -type(keepalive() :: #keepalive{}).
 
 -export_type([keepalive/0]).
 
--define(SWEET_SPOT, 50).    % 50ms
-
 %% @doc Start a keepalive
--spec(start(integer(), any()) -> {ok, keepalive()}).
-start(0, _) ->
+-spec(start(fun(), integer(), any()) -> {ok, keepalive()} | {error, term()}).
+start(_, 0, _) ->
     {ok, #keepalive{}};
-start(TimeoutSec, TimeoutMsg) ->
-    {ok, #keepalive{tmsec = TimeoutSec * 1000, tmsg = TimeoutMsg, tref = timer(TimeoutSec * 1000 + ?SWEET_SPOT, TimeoutMsg)}}.
+start(StatFun, TimeoutSec, TimeoutMsg) ->
+    case catch StatFun() of
+        {ok, StatVal} ->
+            {ok, #keepalive{statfun = StatFun, statval = StatVal,
+                            tsec = TimeoutSec, tmsg = TimeoutMsg,
+                            tref = timer(TimeoutSec, TimeoutMsg)}};
+        {error, Error} ->
+            {error, Error};
+        {'EXIT', Reason} ->
+            {error, Reason}
+    end.
 
 %% @doc Check keepalive, called when timeout...
--spec(check(keepalive(), integer()) -> {ok, keepalive()} | {error, term()}).
-check(KeepAlive = #keepalive{tmsec = TimeoutMs}, LastPacketTs) ->
-    TimeDiff = erlang:system_time(millisecond) - LastPacketTs,
-    case TimeDiff >= TimeoutMs of
-        true ->
-            {error, timeout};
-        false ->
-            {ok, resume(KeepAlive, TimeoutMs + ?SWEET_SPOT - TimeDiff)}
+-spec(check(keepalive()) -> {ok, keepalive()} | {error, term()}).
+check(KeepAlive = #keepalive{statfun = StatFun, statval = LastVal, repeat = Repeat}) ->
+    case catch StatFun() of
+        {ok, NewVal} ->
+            if NewVal =/= LastVal ->
+                    {ok, resume(KeepAlive#keepalive{statval = NewVal, repeat = 0})};
+                Repeat < 1 ->
+                    {ok, resume(KeepAlive#keepalive{statval = NewVal, repeat = Repeat + 1})};
+                true ->
+                    {error, timeout}
+            end;
+        {error, Error} ->
+            {error, Error};
+        {'EXIT', Reason} ->
+            {error, Reason}
     end.
 
--spec(resume(keepalive(), integer()) -> keepalive()).
-resume(KeepAlive = #keepalive{tmsg = TimeoutMsg}, TimeoutMs) ->
-    KeepAlive#keepalive{tref = timer(TimeoutMs, TimeoutMsg)}.
+-spec(resume(keepalive()) -> keepalive()).
+resume(KeepAlive = #keepalive{tsec = TimeoutSec, tmsg = TimeoutMsg}) ->
+    KeepAlive#keepalive{tref = timer(TimeoutSec, TimeoutMsg)}.
 
 %% @doc Cancel Keepalive
 -spec(cancel(keepalive()) -> ok).
@@ -53,6 +67,6 @@ cancel(#keepalive{tref = TRef}) when is_reference(TRef) ->
 cancel(_) ->
     ok.
 
-timer(Millisecond, Msg) ->
-    erlang:send_after(Millisecond, self(), Msg).
+timer(Secs, Msg) ->
+    erlang:send_after(timer:seconds(Secs), self(), Msg).
 

+ 1 - 1
src/emqx_protocol.erl

@@ -607,7 +607,7 @@ check_proto_ver(#mqtt_packet_connect{proto_ver  = Ver,
                                      proto_name = Name}, _PState) ->
     case lists:member({Ver, Name}, ?PROTOCOL_NAMES) of
         true  -> ok;
-        false -> {error, ?RC_UNSUPPORTED_PROTOCOL_VERSION}
+        false -> {error, ?RC_PROTOCOL_ERROR}
     end.
 
 %% MQTT3.1 does not allow null clientId

+ 5 - 5
src/emqx_ws_connection.erl

@@ -152,11 +152,12 @@ send_fun(WsPid) ->
         WsPid ! {binary, iolist_to_binary(Data)}
     end.
 
+stat_fun() ->
+    fun() -> {ok, get(recv_oct)} end.
+
 websocket_handle({binary, <<>>}, State) ->
-    put(last_packet_ts, erlang:system_time(millisecond)),
     {ok, ensure_stats_timer(State)};
 websocket_handle({binary, [<<>>]}, State) ->
-    put(last_packet_ts, erlang:system_time(millisecond)),
     {ok, ensure_stats_timer(State)};
 websocket_handle({binary, Data}, State = #state{parser_state = ParserState,
                                                 proto_state  = ProtoState}) ->
@@ -166,7 +167,6 @@ websocket_handle({binary, Data}, State = #state{parser_state = ParserState,
     emqx_metrics:inc('bytes/received', BinSize),
     case catch emqx_frame:parse(iolist_to_binary(Data), ParserState) of
         {more, NewParserState} ->
-            put(last_packet_ts, erlang:system_time(millisecond)),
             {ok, State#state{parser_state = NewParserState}};
         {ok, Packet, Rest} ->
             emqx_metrics:received(Packet),
@@ -225,7 +225,7 @@ websocket_info({timeout, Timer, emit_stats},
 
 websocket_info({keepalive, start, Interval}, State) ->
     ?WSLOG(debug, "Keepalive at the interval of ~p", [Interval], State),
-    case emqx_keepalive:start(Interval, {keepalive, check}) of
+    case emqx_keepalive:start(stat_fun(), Interval, {keepalive, check}) of
         {ok, KeepAlive} ->
             {ok, State#state{keepalive = KeepAlive}};
         {error, Error} ->
@@ -234,7 +234,7 @@ websocket_info({keepalive, start, Interval}, State) ->
     end;
 
 websocket_info({keepalive, check}, State = #state{keepalive = KeepAlive}) ->
-    case emqx_keepalive:check(KeepAlive, get(last_packet_ts)) of
+    case emqx_keepalive:check(KeepAlive) of
         {ok, KeepAlive1} ->
             {ok, State#state{keepalive = KeepAlive1}};
         {error, timeout} ->

+ 7 - 8
test/emqx_keepalive_SUITE.erl

@@ -26,18 +26,17 @@ groups() -> [{keepalive, [], [t_keepalive]}].
 %%--------------------------------------------------------------------
 
 t_keepalive(_) ->
-    {ok, KA} = emqx_keepalive:start(1, {keepalive, timeout}),
-    resumed = keepalive_recv(KA, 100),
-    timeout = keepalive_recv(KA, 2000).
+    {ok, KA} = emqx_keepalive:start(fun() -> {ok, 1} end, 1, {keepalive, timeout}),
+    [resumed, timeout] = lists:reverse(keepalive_recv(KA, [])).
 
-keepalive_recv(KA, MockInterval) ->
+keepalive_recv(KA, Acc) ->
     receive
         {keepalive, timeout} ->
-            case emqx_keepalive:check(KA, erlang:system_time(millisecond) - MockInterval) of
-                {ok, _} -> resumed;
-                {error, timeout} -> timeout
+            case emqx_keepalive:check(KA) of
+                {ok, KA1} -> keepalive_recv(KA1, [resumed | Acc]);
+                {error, timeout} -> [timeout | Acc]
             end
         after 4000 ->
-                error
+                Acc
     end.
 

+ 43 - 0
test/emqx_net_SUITE.erl

@@ -0,0 +1,43 @@
+%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+
+-module(emqx_net_SUITE).
+
+%% CT
+-compile(export_all).
+-compile(nowarn_export_all).
+
+all() -> [{group, keepalive}].
+
+groups() -> [{keepalive, [], [t_keepalive]}].
+
+%%--------------------------------------------------------------------
+%% Keepalive
+%%--------------------------------------------------------------------
+
+t_keepalive(_) ->
+    {ok, KA} = emqx_keepalive:start(fun() -> {ok, 1} end, 1, {keepalive, timeout}),
+    [resumed, timeout] = lists:reverse(keepalive_recv(KA, [])).
+
+keepalive_recv(KA, Acc) ->
+    receive
+        {keepalive, timeout} ->
+            case emqx_keepalive:check(KA) of
+                {ok, KA1} -> keepalive_recv(KA1, [resumed | Acc]);
+                {error, timeout} -> [timeout | Acc]
+            end
+        after 4000 ->
+                Acc
+    end.
+