|
|
@@ -27,6 +27,10 @@
|
|
|
%% API
|
|
|
-export([detect/1]).
|
|
|
|
|
|
+-ifdef(TEST).
|
|
|
+-export([get_policy/2]).
|
|
|
+-endif.
|
|
|
+
|
|
|
%% gen_server callbacks
|
|
|
-export([
|
|
|
init/1,
|
|
|
@@ -39,15 +43,6 @@
|
|
|
|
|
|
%% Tab
|
|
|
-define(FLAPPING_TAB, ?MODULE).
|
|
|
-%% Default Policy
|
|
|
--define(FLAPPING_THRESHOLD, 30).
|
|
|
--define(FLAPPING_DURATION, 60000).
|
|
|
--define(FLAPPING_BANNED_INTERVAL, 300000).
|
|
|
--define(DEFAULT_DETECT_POLICY, #{
|
|
|
- max_count => ?FLAPPING_THRESHOLD,
|
|
|
- window_time => ?FLAPPING_DURATION,
|
|
|
- ban_time => ?FLAPPING_BANNED_INTERVAL
|
|
|
-}).
|
|
|
|
|
|
-record(flapping, {
|
|
|
clientid :: emqx_types:clientid(),
|
|
|
@@ -69,7 +64,7 @@ stop() -> gen_server:stop(?MODULE).
|
|
|
%% @doc Detect flapping when a MQTT client disconnected.
|
|
|
-spec detect(emqx_types:clientinfo()) -> boolean().
|
|
|
detect(#{clientid := ClientId, peerhost := PeerHost, zone := Zone}) ->
|
|
|
- Policy = #{max_count := Threshold} = get_policy(Zone),
|
|
|
+ Policy = #{max_count := Threshold} = get_policy([max_count, window_time, ban_time], Zone),
|
|
|
%% The initial flapping record sets the detect_cnt to 0.
|
|
|
InitVal = #flapping{
|
|
|
clientid = ClientId,
|
|
|
@@ -89,8 +84,22 @@ detect(#{clientid := ClientId, peerhost := PeerHost, zone := Zone}) ->
|
|
|
end
|
|
|
end.
|
|
|
|
|
|
-get_policy(Zone) ->
|
|
|
- emqx_config:get_zone_conf(Zone, [flapping_detect]).
|
|
|
+get_policy(Keys, Zone) when is_list(Keys) ->
|
|
|
+ RootKey = flapping_detect,
|
|
|
+ Conf = emqx_config:get_zone_conf(Zone, [RootKey]),
|
|
|
+ lists:foldl(
|
|
|
+ fun(Key, Acc) ->
|
|
|
+ case maps:find(Key, Conf) of
|
|
|
+ {ok, V} -> Acc#{Key => V};
|
|
|
+ error -> Acc#{Key => emqx_config:get([RootKey, Key])}
|
|
|
+ end
|
|
|
+ end,
|
|
|
+ #{},
|
|
|
+ Keys
|
|
|
+ );
|
|
|
+get_policy(Key, Zone) ->
|
|
|
+ #{Key := Conf} = get_policy([Key], Zone),
|
|
|
+ Conf.
|
|
|
|
|
|
now_diff(TS) -> erlang:system_time(millisecond) - TS.
|
|
|
|
|
|
@@ -166,8 +175,7 @@ handle_cast(Msg, State) ->
|
|
|
|
|
|
handle_info({timeout, _TRef, {garbage_collect, Zone}}, State) ->
|
|
|
Timestamp =
|
|
|
- erlang:system_time(millisecond) -
|
|
|
- maps:get(window_time, get_policy(Zone)),
|
|
|
+ erlang:system_time(millisecond) - get_policy(window_time, Zone),
|
|
|
MatchSpec = [{{'_', '_', '_', '$1', '_'}, [{'<', '$1', Timestamp}], [true]}],
|
|
|
ets:select_delete(?FLAPPING_TAB, MatchSpec),
|
|
|
_ = start_timer(Zone),
|
|
|
@@ -183,15 +191,19 @@ code_change(_OldVsn, State, _Extra) ->
|
|
|
{ok, State}.
|
|
|
|
|
|
start_timer(Zone) ->
|
|
|
- WindTime = maps:get(window_time, get_policy(Zone)),
|
|
|
- emqx_utils:start_timer(WindTime, {garbage_collect, Zone}).
|
|
|
+ case get_policy(window_time, Zone) of
|
|
|
+ WindowTime when is_integer(WindowTime) ->
|
|
|
+ emqx_utils:start_timer(WindowTime, {garbage_collect, Zone});
|
|
|
+ disabled ->
|
|
|
+ ok
|
|
|
+ end.
|
|
|
|
|
|
start_timers() ->
|
|
|
- lists:foreach(
|
|
|
- fun({Zone, _ZoneConf}) ->
|
|
|
+ maps:foreach(
|
|
|
+ fun(Zone, _ZoneConf) ->
|
|
|
start_timer(Zone)
|
|
|
end,
|
|
|
- maps:to_list(emqx:get_config([zones], #{}))
|
|
|
+ emqx:get_config([zones], #{})
|
|
|
).
|
|
|
|
|
|
fmt_host(PeerHost) ->
|