zhongwencool 2 лет назад
Родитель
Сommit
535870386a

+ 35 - 0
apps/emqx/src/config/emqx_config_zones.erl

@@ -0,0 +1,35 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_config_zones).
+
+-behaviour(emqx_config_handler).
+
+%% API
+-export([add_handler/0, remove_handler/0, pre_config_update/3]).
+
+-define(ZONES, [zones]).
+
+add_handler() ->
+    ok = emqx_config_handler:add_handler(?ZONES, ?MODULE),
+    ok.
+
+remove_handler() ->
+    ok = emqx_config_handler:remove_handler(?ZONES),
+    ok.
+
+%% replace the old config with the new config
+pre_config_update(?ZONES, NewRaw, _OldRaw) ->
+    {ok, NewRaw}.

+ 4 - 1
apps/emqx/src/emqx_config.erl

@@ -637,11 +637,13 @@ save_to_override_conf(false, RawConf, _Opts) ->
 
 add_handlers() ->
     ok = emqx_config_logger:add_handler(),
+    ok = emqx_config_zones:add_handler(),
     emqx_sys_mon:add_handler(),
     ok.
 
 remove_handlers() ->
     ok = emqx_config_logger:remove_handler(),
+    ok = emqx_config_zones:remove_handler(),
     emqx_sys_mon:remove_handler(),
     ok.
 
@@ -914,7 +916,8 @@ rawconf_to_conf(SchemaModule, RawPath, RawValue) ->
     emqx_utils_maps:deep_get(AtomPath, RawUserDefinedValues).
 
 %% When the global zone change, the zones is updated with the new global zone.
-%% The zones config has no config_handler callback, so we need to update via this hook
+%% The global zone's keys is too many,
+%% so we don't choose to write a global zone change emqx_config_handler callback to hook
 post_save_config_hook(?PERSIS_KEY(?CONF, zones), _Zones) ->
     emqx_flapping:update_config(),
     ok;

+ 15 - 5
apps/emqx/src/emqx_flapping.erl

@@ -50,6 +50,12 @@
     started_at :: pos_integer(),
     detect_cnt :: integer()
 }).
+-define(DEFAULT_POLICY, #{
+    enable => false,
+    max_count => 15,
+    window_time => 60000,
+    ban_time => 5 * 6000
+}).
 
 -opaque flapping() :: #flapping{}.
 
@@ -91,8 +97,9 @@ detect(ClientId, PeerHost, #{enable := true, max_count := Threshold} = Policy) -
 detect(_ClientId, _PeerHost, #{enable := false}) ->
     false.
 
+%% with default, if we delete Zone at running time. we should not crash.
 get_policy(Zone) ->
-    emqx_config:get_zone_conf(Zone, [flapping_detect]).
+    emqx_config:get_zone_conf(Zone, [flapping_detect], ?DEFAULT_POLICY).
 
 now_diff(TS) -> erlang:system_time(millisecond) - TS.
 
@@ -201,11 +208,14 @@ start_timers() ->
 
 update_timer(Timers) ->
     maps:map(
-        fun(ZoneName, #{flapping_detect := FlappingDetect}) ->
+        fun(ZoneName, #{flapping_detect := FlappingDetect = #{enable := Enable}}) ->
             case maps:get(ZoneName, Timers, undefined) of
-                undefined -> start_timer(FlappingDetect, ZoneName);
-                %% Don't reset this timer, it will be updated after next timeout.
-                TRef -> TRef
+                undefined ->
+                    start_timer(FlappingDetect, ZoneName);
+                TRef when Enable -> TRef;
+                TRef ->
+                    erlang:cancel_timer(TRef),
+                    undefined
             end
         end,
         emqx:get_config([zones], #{})

+ 28 - 2
apps/emqx/test/emqx_flapping_SUITE.erl

@@ -53,7 +53,6 @@ t_detect_check(_) ->
         clientid => <<"client007">>,
         peerhost => {127, 0, 0, 1}
     },
-    ct:pal("www:~p~n", [emqx_flapping:get_policy(default)]),
     false = emqx_flapping:detect(ClientInfo),
     false = emqx_banned:check(ClientInfo),
     false = emqx_flapping:detect(ClientInfo),
@@ -133,7 +132,34 @@ t_conf_update(_) ->
     %% reset to default(empty) andalso get default from global
     ?assertMatch({ok, _}, emqx:update_config([zones], #{})),
     ?assertEqual(Global, emqx:get_config([zones, default, flapping_detect])),
+    ?assertError({config_not_found, _}, get_policy(zone_1)),
+    ?assertError({config_not_found, _}, get_policy(zone_2)),
+    ok.
+
+t_conf_update_timer(_Config) ->
+    _ = emqx_flapping:start_link(),
+    validate_timer([default]),
+    {ok, _} =
+        emqx:update_config([zones], #{
+            <<"timer_1">> => #{<<"flapping_detect">> => #{<<"enable">> => true}},
+            <<"timer_2">> => #{<<"flapping_detect">> => #{<<"enable">> => true}},
+            <<"timer_3">> => #{<<"flapping_detect">> => #{<<"enable">> => false}}
+        }),
+    validate_timer([timer_1, timer_2, timer_3, default]),
+    ok.
+
+validate_timer(Names) ->
+    Zones = emqx:get_config([zones]),
+    ?assertEqual(lists:sort(Names), lists:sort(maps:keys(Zones))),
+    Timers = sys:get_state(emqx_flapping),
+    maps:foreach(
+        fun(Name, #{flapping_detect := #{enable := Enable}}) ->
+            ?assertEqual(Enable, is_reference(maps:get(Name, Timers)), Timers)
+        end,
+        Zones
+    ),
+    ?assertEqual(maps:keys(Zones), maps:keys(Timers)),
     ok.
 
 get_policy(Zone) ->
-    emqx_flapping:get_policy(Zone).
+    emqx_config:get_zone_conf(Zone, [flapping_detect]).

+ 1 - 1
apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl

@@ -209,7 +209,7 @@ t_zones(_Config) ->
     ?assertEqual(Mqtt1, NewMqtt),
     %% delete the new zones
     {ok, #{}} = update_config("zones", Zones),
-    ?assertEqual(undefined, emqx_config:get_raw([new_zone, mqtt], undefined)),
+    ?assertEqual(undefined, emqx_config:get_raw([zones, new_zone], undefined)),
     ok.
 
 t_dashboard(_Config) ->