Explorar o código

Call emqx_flapping:detect and generate alarm when flapping is detected

zhouzb %!s(int64=6) %!d(string=hai) anos
pai
achega
24bfaa768d
Modificáronse 3 ficheiros con 23 adicións e 11 borrados
  1. 3 1
      src/emqx_channel.erl
  2. 14 6
      src/emqx_flapping.erl
  3. 6 4
      test/emqx_flapping_SUITE.erl

+ 3 - 1
src/emqx_channel.erl

@@ -671,7 +671,9 @@ handle_info(disconnected, Channel = #channel{connected = false}) ->
     {ok, Channel};
 
 handle_info(disconnected, Channel = #channel{protocol = Protocol,
-                                             session  = Session}) ->
+                                             session  = Session,
+                                             client   = Client = #{zone := Zone}}) ->
+    emqx_zone:enable_flapping_detect(Zone) andalso emqx_flapping:detect(Client),
     Channel1 = ensure_disconnected(Channel),
     Channel2 = case timer:seconds(emqx_protocol:info(will_delay_interval, Protocol)) of
                    0 ->

+ 14 - 6
src/emqx_flapping.erl

@@ -141,11 +141,11 @@ handle_cast({detected, Flapping = #flapping{client_id  = ClientId,
             %% Log first
             ?LOG(error, "Flapping detected: ~s(~s) disconnected ~w times in ~wms",
                  [ClientId, esockd_net:format(Peername), DetectCnt, Duration]),
-            %% TODO: Send Alarm
             %% Banned.
             BannedFlapping = Flapping#flapping{client_id = {banned, ClientId},
                                                banned_at = emqx_time:now_ms()
                                               },
+            alarm_handler:set_alarm({{flapping_detected, ClientId}, BannedFlapping}),
             ets:insert(?FLAPPING_TAB, BannedFlapping);
         false ->
             ?LOG(warning, "~s(~s) disconnected ~w times in ~wms",
@@ -189,9 +189,17 @@ with_flapping_tab(Fun, Args) ->
     end.
 
 expire_flapping(NowTime, #{duration := Duration, banned_interval := Interval}) ->
-    ets:select_delete(?FLAPPING_TAB,
-                      [{#flapping{started_at = '$1', banned_at = undefined, _ = '_'},
-                        [{'<', '$1', NowTime-Duration}], [true]},
-                       {#flapping{client_id = {banned, '_'}, banned_at = '$1', _ = '_'},
-                        [{'<', '$1', NowTime-Interval}], [true]}]).
+    case ets:select(?FLAPPING_TAB,
+                    [{#flapping{started_at = '$1', banned_at = undefined, _ = '_'},
+                      [{'<', '$1', NowTime-Duration}], ['$_']},
+                     {#flapping{client_id = {banned, '_'}, banned_at = '$1', _ = '_'},
+                      [{'<', '$1', NowTime-Interval}], ['$_']}]) of
+        [] -> ok;
+        Flappings ->
+            lists:foreach(fun(Flapping = #flapping{client_id = {banned, ClientId}}) ->
+                              ets:delete_object(?FLAPPING_TAB, Flapping),
+                              alarm_handler:clear_alarm({flapping_detected, ClientId});
+                             (_) -> ok
+                            end, Flappings)
+    end.
 

+ 6 - 4
test/emqx_flapping_SUITE.erl

@@ -22,22 +22,24 @@
 all() -> emqx_ct:all(?MODULE).
 
 init_per_suite(Config) ->
-    prepare_env(),
+    emqx_ct_helpers:boot_modules(all),
+    emqx_ct_helpers:start_apps([], fun set_special_configs/1),
     Config.
 
-prepare_env() ->
+set_special_configs(emqx) ->
     emqx_zone:set_env(external, enable_flapping_detect, true),
     application:set_env(emqx, flapping_detect_policy,
                         #{threshold => 3,
                           duration => 100,
                           banned_interval => 200
-                         }).
+                         });
+set_special_configs(_App) -> ok.
 
 end_per_suite(_Config) ->
+    emqx_ct_helpers:stop_apps([]),
     ok.
 
 t_detect_check(_) ->
-    {ok, _Pid} = emqx_flapping:start_link(),
     Client = #{zone => external,
                client_id => <<"clientid">>,
                peername => {{127,0,0,1}, 5000}