Просмотр исходного кода

Refactor flapping code (#2476)

* Refactor flapping code
Gilbert 6 лет назад
Родитель
Сommit
eaa1deab20
5 измененных файлов с 22 добавлено и 48 удалено
  1. 2 2
      etc/emqx.conf
  2. 1 1
      priv/emqx.schema
  3. 0 1
      src/emqx_cm.erl
  4. 16 40
      src/emqx_flapping.erl
  5. 3 4
      src/emqx_protocol.erl

+ 2 - 2
etc/emqx.conf

@@ -694,7 +694,7 @@ zone.external.flapping_threshold = 10, 1m
 ## -s: second
 ##
 ## Default: 1h, 1 hour
-zone.external.flapping_expiry_interval = 1h
+zone.external.flapping_banned_expiry_interval = 1h
 
 ## All the topics will be prefixed with the mountpoint path if this option is enabled.
 ##
@@ -789,7 +789,7 @@ zone.internal.flapping_threshold = 10, 1m
 ## -s: second
 ##
 ## Default: 1h, 1 hour
-zone.internal.flapping_expiry_interval = 1h
+zone.internal.flapping_banned_expiry_interval = 1h
 
 ## All the topics will be prefixed with the mountpoint path if this option is enabled.
 ##

+ 1 - 1
priv/emqx.schema

@@ -836,7 +836,7 @@ end}.
   {datatype, string}
 ]}.
 
-{mapping, "zone.$name.flapping_expiry_interval", "emqx.zones", [
+{mapping, "zone.$name.flapping_banned_expiry_interval", "emqx.zones", [
   {datatype, {duration, s}}
 ]}.
 

+ 0 - 1
src/emqx_cm.erl

@@ -207,4 +207,3 @@ stats_fun() ->
         undefined -> ok;
         Size -> emqx_stats:setstat('connections/count', 'connections/max', Size)
     end.
-

+ 16 - 40
src/emqx_flapping.erl

@@ -50,27 +50,23 @@
 %% the expiry time unit is minutes.
 -spec(init_flapping(ClientId :: binary(), Interval :: integer()) -> flapping_record()).
 init_flapping(ClientId, Interval) ->
-    #flapping{ client_id = ClientId
-             , check_count = 1
-             , timestamp = emqx_time:now_secs() + Interval
-             }.
+    #flapping{client_id = ClientId,
+              check_count = 1,
+              timestamp = emqx_time:now_secs() + Interval}.
 
 %% @doc This function is used to initialize flapping records
 %% the expiry time unit is minutes.
--spec(check( Action :: atom()
-           , ClientId :: binary()
-           , Threshold :: {integer(), integer()})
-      -> flapping_state()).
+-spec(check(Action :: atom(), ClientId :: binary(),
+            Threshold :: {integer(), integer()}) -> flapping_state()).
 check(Action, ClientId, Threshold = {_TimesThreshold, TimeInterval}) ->
     check(Action, ClientId, Threshold, init_flapping(ClientId, TimeInterval)).
 
--spec(check( Action :: atom()
-           , ClientId :: binary()
-           , Threshold :: {integer(), integer()}
-           , InitFlapping :: flapping_record())
-      -> flapping_state()).
+-spec(check(Action :: atom(), ClientId :: binary(),
+            Threshold :: {integer(), integer()},
+            InitFlapping :: flapping_record()) -> flapping_state()).
 check(Action, ClientId, Threshold, InitFlapping) ->
-    try ets:update_counter(?FLAPPING_TAB, ClientId, {_Pos = #flapping.check_count, 1}) of
+    case ets:update_counter(?FLAPPING_TAB, ClientId, {#flapping.check_count, 1}, InitFlapping) of
+        1 -> ok;
         CheckCount ->
             case ets:lookup(?FLAPPING_TAB, ClientId) of
                 [Flapping] ->
@@ -78,23 +74,14 @@ check(Action, ClientId, Threshold, InitFlapping) ->
                 _Flapping ->
                     ok
             end
-    catch
-        error:badarg ->
-            ets:insert_new(?FLAPPING_TAB, InitFlapping),
-            ok
     end.
 
--spec(check_flapping( Action :: atom()
-                    , CheckTimes :: integer()
-                    , Threshold :: {integer(), integer()}
-                    , InitFlapping :: flapping_record())
-      -> flapping_state()).
-check_flapping(Action, CheckTimes, _Threshold = {TimesThreshold, TimeInterval},
+check_flapping(Action, CheckCount, _Threshold = {TimesThreshold, TimeInterval},
                Flapping = #flapping{ client_id = ClientId
                                    , timestamp = Timestamp }) ->
     case emqx_time:now_secs() of
         NowTimestamp when NowTimestamp =< Timestamp,
-                          CheckTimes > TimesThreshold ->
+                          CheckCount > TimesThreshold ->
             ets:delete(?FLAPPING_TAB, ClientId),
             flapping;
         NowTimestamp when NowTimestamp > Timestamp,
@@ -110,7 +97,7 @@ check_flapping(Action, CheckTimes, _Threshold = {TimesThreshold, TimeInterval},
 %%--------------------------------------------------------------------
 %% gen_statem callbacks
 %%--------------------------------------------------------------------
--spec(start_link(TimerInterval :: integer()) -> startlink_ret()).
+-spec(start_link(TimerInterval :: [integer()]) -> startlink_ret()).
 start_link(TimerInterval) ->
     gen_statem:start_link({local, ?MODULE}, ?MODULE, [TimerInterval], []).
 
@@ -145,17 +132,6 @@ terminate(_Reason, _StateName, _State) ->
 
 %% @doc clean expired records in ets
 clean_expired_records() ->
-    Records = ets:tab2list(?FLAPPING_TAB),
-    traverse_records(Records).
-
-traverse_records([]) ->
-    ok;
-traverse_records([#flapping{client_id = ClientId,
-                            timestamp = Timestamp} | LeftRecords]) ->
-    case emqx_time:now_secs() > Timestamp of
-        true ->
-            ets:delete(?FLAPPING_TAB, ClientId);
-        false ->
-            true
-    end,
-    traverse_records(LeftRecords).
+    NowTime = emqx_time:now_secs(),
+    MatchSpec = [{{'$1', '$2', '$3'},[{'<', '$3', NowTime}], [true]}],
+    ets:select_delete(?FLAPPING_TAB, MatchSpec).

+ 3 - 4
src/emqx_protocol.erl

@@ -943,16 +943,15 @@ flag(true)  -> 1.
 do_flapping_detect(Action, #pstate{zone = Zone,
                                    client_id = ClientId,
                                    enable_flapping_detect = true}) ->
-    ExpiryInterval = emqx_zone:get_env(Zone, flapping_expiry_interval, 3600000),
+    BanExpiryInterval = emqx_zone:get_env(Zone, flapping_ban_expiry_interval, 3600000),
     Threshold = emqx_zone:get_env(Zone, flapping_threshold, 20),
-    Until = erlang:system_time(second) + ExpiryInterval,
+    Until = erlang:system_time(second) + BanExpiryInterval,
     case emqx_flapping:check(Action, ClientId, Threshold) of
         flapping ->
             emqx_banned:add(#banned{who = {client_id, ClientId},
                                     reason = <<"flapping">>,
                                     by = <<"flapping_checker">>,
-                                    until = Until
-                                   }),
+                                    until = Until}),
             ok;
         _Other ->
             ok