Просмотр исходного кода

fix(stomp): backoff outgoung hear-beat timer interval

JianBo He 4 лет назад
Родитель
Сommit
e4e8590a77

+ 10 - 1
apps/emqx_stomp/src/emqx_stomp_connection.erl

@@ -43,7 +43,7 @@
         ]).
 
 %% for protocol
--export([send/4, heartbeat/2]).
+-export([send/4, heartbeat/2, statfun/3]).
 
 %% for mgmt
 -export([call/2, call/3]).
@@ -157,6 +157,7 @@ init_state(Transport, Socket, ProtoEnv) ->
     {ok, Sockname} = Transport:ensure_ok_or_exit(sockname, [Socket]),
 
     SendFun = {fun ?MODULE:send/4, [Transport, Socket, self()]},
+    StatFun = {fun ?MODULE:statfun/3, [Transport, Socket]},
     HrtBtFun = {fun ?MODULE:heartbeat/2, [Transport, Socket]},
     Parser = emqx_stomp_frame:init_parer_state(ProtoEnv),
 
@@ -168,6 +169,7 @@ init_state(Transport, Socket, ProtoEnv) ->
                  peername => Peername,
                  sockname => Sockname,
                  peercert => Peercert,
+                 statfun  => StatFun,
                  sendfun  => SendFun,
                  heartfun => HrtBtFun,
                  conn_mod => ?MODULE
@@ -218,8 +220,15 @@ send(Frame, Transport, Sock, ConnPid) ->
     end.
 
 heartbeat(Transport, Sock) ->
+    ?LOG(debug, "SEND heartbeat: \\n"),
     Transport:send(Sock, <<$\n>>).
 
+statfun(Stat, Transport, Sock) ->
+    case Transport:getstat(Sock, [Stat]) of
+        {ok, [{Stat, Val}]} -> {ok, Val};
+        {error, Error}      -> {error, Error}
+    end.
+
 handle_call(info, _From, State) ->
     {reply, info(State), State};
 

+ 21 - 8
apps/emqx_stomp/src/emqx_stomp_heartbeat.erl

@@ -23,9 +23,10 @@
         , check/3
         , info/1
         , interval/2
+        , reset/3
         ]).
 
--record(heartbeater, {interval, statval, repeat}).
+-record(heartbeater, {interval, statval, repeat, repeat_max}).
 
 -type name() :: incoming | outgoing.
 
@@ -42,19 +43,23 @@ init({0, 0}) ->
     #{};
 init({Cx, Cy}) ->
     maps:filter(fun(_, V) -> V /= undefined end,
-      #{incoming => heartbeater(Cx),
-        outgoing => heartbeater(Cy)
+      #{incoming => heartbeater(incoming, Cx),
+        outgoing => heartbeater(outgoing, Cy)
        }).
 
-heartbeater(0) ->
+heartbeater(_, 0) ->
     undefined;
-heartbeater(I) ->
+heartbeater(N, I) ->
     #heartbeater{
        interval = I,
        statval = 0,
-       repeat = 0
+       repeat = 0,
+       repeat_max = repeat_max(N)
       }.
 
+repeat_max(incoming) -> 1;
+repeat_max(outgoing) -> 0.
+
 -spec check(name(), pos_integer(), heartbeat())
     -> {ok, heartbeat()}
      | {error, timeout}.
@@ -67,11 +72,12 @@ check(Name, NewVal, HrtBt) ->
     end.
 
 check(NewVal, HrtBter = #heartbeater{statval = OldVal,
-                                     repeat = Repeat}) ->
+                                     repeat = Repeat,
+                                     repeat_max = Max}) ->
     if
         NewVal =/= OldVal ->
             {ok, HrtBter#heartbeater{statval = NewVal, repeat = 0}};
-        Repeat < 1 ->
+        Repeat < Max ->
             {ok, HrtBter#heartbeater{repeat = Repeat + 1}};
         true -> {error, timeout}
     end.
@@ -89,3 +95,10 @@ interval(Type, HrtBt) ->
         undefined -> undefined;
         #heartbeater{interval = Intv} -> Intv
     end.
+
+reset(Type, StatVal, HrtBt) ->
+    case maps:get(Type, HrtBt, undefined) of
+        undefined -> HrtBt;
+        HrtBter ->
+            HrtBt#{Type => HrtBter#heartbeater{statval = StatVal, repeat = 0}}
+    end.

+ 25 - 4
apps/emqx_stomp/src/emqx_stomp_protocol.erl

@@ -69,6 +69,8 @@
           sendfun :: {function(), list()},
           %% Heartbeat function
           heartfun :: {function(), list()},
+          %% Get Socket stat function
+          statfun :: {function(), list()},
           %% The confs for the connection
           %% TODO: put these configs into a public mem?
           allow_anonymous :: maybe(boolean()),
@@ -77,6 +79,9 @@
 
 -define(DEFAULT_SUB_ACK, <<"auto">>).
 
+-define(INCOMING_TIMER_BACKOFF, 1.25).
+-define(OUTCOMING_TIMER_BACKOFF, 0.75).
+
 -define(TIMER_TABLE, #{
           incoming_timer => incoming,
           outgoing_timer => outgoing,
@@ -108,7 +113,8 @@
 %% @doc Init protocol
 init(ConnInfo = #{peername := {PeerHost, _Port},
                   sockname := {_Host, SockPort},
-                  sendfun := SendFun,
+                  statfun  := StatFun,
+                  sendfun  := SendFun,
                   heartfun := HeartFun}, Opts) ->
 
     NConnInfo = default_conninfo(ConnInfo),
@@ -132,6 +138,7 @@ init(ConnInfo = #{peername := {PeerHost, _Port},
        clientinfo = ClientInfo,
        heartfun = HeartFun,
        sendfun = SendFun,
+       statfun = StatFun,
        timers = #{},
        transaction = #{},
        allow_anonymous = AllowAnonymous,
@@ -231,6 +238,8 @@ received(#stomp_frame{command = <<"CONNECT">>, headers = Headers},
                              default_user(State)
                             ) of
                 true ->
+                    Heartbeats = parse_heartbeats(
+                                   header(<<"heart-beat">>, Headers, <<"0,0">>)),
                     ClientId = emqx_guid:to_base62(emqx_guid:gen()),
                     emqx_logger:set_metadata_clientid(ClientId),
                     ConnInfo = State#pstate.conninfo,
@@ -238,6 +247,7 @@ received(#stomp_frame{command = <<"CONNECT">>, headers = Headers},
                     NConnInfo = ConnInfo#{
                                   proto_ver => Version,
                                   clientid => ClientId,
+                                  keepalive => element(1, Heartbeats) div 1000,
                                   username => Login
                                  },
                     NClitInfo = ClitInfo#{
@@ -250,7 +260,6 @@ received(#stomp_frame{command = <<"CONNECT">>, headers = Headers},
                         emqx_cm:discard_session(ClientId),
                         emqx_cm:register_channel(ClientId, ConnPid, NConnInfo)
                     end),
-                    Heartbeats = parse_heartbeats(header(<<"heart-beat">>, Headers, <<"0,0">>)),
                     NState = start_heartbeart_timer(
                                Heartbeats,
                                State#pstate{
@@ -454,11 +463,18 @@ timeout(_TRef, {incoming, NewVal},
 
 timeout(_TRef, {outgoing, NewVal},
         State = #pstate{heart_beats = HrtBt,
+                        statfun = {StatFun, StatArgs},
                         heartfun = {Fun, Args}}) ->
     case emqx_stomp_heartbeat:check(outgoing, NewVal, HrtBt) of
         {error, timeout} ->
             _ = erlang:apply(Fun, Args),
-            {ok, State};
+            case erlang:apply(StatFun, [send_oct] ++ StatArgs) of
+                {ok, NewVal2} ->
+                    NHrtBt = emqx_stomp_heartbeat:reset(outgoing, NewVal2, HrtBt),
+                    {ok, reset_timer(outgoing_timer, State#pstate{heart_beats = NHrtBt})};
+                {error, Reason} ->
+                    {shutdown, {error, {get_stats_error, Reason}}, State}
+            end;
         {ok, NHrtBt} ->
             {ok, reset_timer(outgoing_timer, State#pstate{heart_beats = NHrtBt})}
     end;
@@ -633,7 +649,11 @@ reverse_heartbeats({Cx, Cy}) ->
 start_heartbeart_timer(Heartbeats, State) ->
     ensure_timer(
       [incoming_timer, outgoing_timer],
-      State#pstate{heart_beats = emqx_stomp_heartbeat:init(Heartbeats)}).
+      State#pstate{heart_beats = emqx_stomp_heartbeat:init(backoff(Heartbeats))}).
+
+backoff({Cx, Cy}) ->
+    {erlang:ceil(Cx * ?INCOMING_TIMER_BACKOFF),
+     erlang:ceil(Cy * ?OUTCOMING_TIMER_BACKOFF)}.
 
 %%--------------------------------------------------------------------
 %% pub & sub helpers
@@ -768,3 +788,4 @@ interval(outgoing_timer, #pstate{heart_beats = HrtBt}) ->
     emqx_stomp_heartbeat:interval(outgoing, HrtBt);
 interval(clean_trans_timer, _) ->
     ?TRANS_TIMEOUT.
+