Jelajahi Sumber

Add flapping detect feature

Gilbert Wong 7 tahun lalu
induk
melakukan
bcbb4b68e9

+ 1 - 1
Makefile

@@ -38,7 +38,7 @@ CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \
 			emqx_listeners emqx_protocol emqx_pool emqx_shared_sub emqx_bridge \
 			emqx_hooks emqx_batch emqx_sequence emqx_pmon emqx_pd emqx_gc emqx_ws_connection \
 			emqx_packet emqx_connection emqx_tracer emqx_sys_mon emqx_message emqx_os_mon \
-            emqx_vm_mon emqx_alarm_handler emqx_rpc
+            emqx_vm_mon emqx_alarm_handler emqx_rpc emqx_flapping
 
 CT_NODE_NAME = emqxct@127.0.0.1
 CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME)

+ 62 - 3
etc/emqx.conf

@@ -438,6 +438,17 @@ acl_cache_ttl = 1m
 ## Default: ignore
 acl_deny_action = ignore
 
+## The cleanning interval for flapping
+##
+## Value: Duration
+## -d: day
+## -h: hour
+## -m: minute
+## -s: second
+##
+## Default: 1h, 1 hour
+## flapping_clean_interval = 1h
+
 ##--------------------------------------------------------------------
 ## MQTT Protocol
 ##--------------------------------------------------------------------
@@ -650,11 +661,35 @@ zone.external.mqueue_priorities = none
 ## Value: highest | lowest
 zone.external.mqueue_default_priority = highest
 
-## Whether to enqueue Qos0 messages.
+## Whether to enqueue QoS0 messages.
 ##
 ## Value: false | true
 zone.external.mqueue_store_qos0 = true
 
+## Whether to turn on flapping detect
+##
+## Value: on | off
+zone.external.enable_flapping_detect = off
+
+## The times of state change per min, specifying the threshold which is used to
+## detect if the connection starts flapping
+##
+## Value: number
+zone.external.flapping_threshold = 10, 1m
+
+## Flapping expiry interval for connections.
+## This config entry is used to determine when the connection
+## will be unbanned.
+##
+## Value: Duration
+## -d: day
+## -h: hour
+## -m: minute
+## -s: second
+##
+## Default: 1h, 1 hour
+zone.external.flapping_expiry_interval = 1h
+
 ## All the topics will be prefixed with the mountpoint path if this option is enabled.
 ##
 ## Variables in mountpoint path:
@@ -726,6 +761,30 @@ zone.internal.max_mqueue_len = 1000
 ## Value: false | true
 zone.internal.mqueue_store_qos0 = true
 
+## Whether to turn on flapping detect
+##
+## Value: on | off
+zone.internal.enable_flapping_detect = off
+
+## The times of state change per second, specifying the threshold which is used to
+## detect if the connection starts flapping
+##
+## Value: number
+zone.internal.flapping_threshold = 10, 1m
+
+## Flapping expiry interval for connections.
+## This config entry is used to determine when the connection
+## will be unbanned.
+##
+## Value: Duration
+## -d: day
+## -h: hour
+## -m: minute
+## -s: second
+##
+## Default: 1h, 1 hour
+zone.internal.flapping_expiry_interval = 1h
+
 ## All the topics will be prefixed with the mountpoint path if this option is enabled.
 ##
 ## Variables in mountpoint path:
@@ -1784,13 +1843,13 @@ listener.wss.external.send_timeout_close = on
 ## SSL Ciphers used by the bridge.
 ##
 ## Value: String
-#bridge.aws.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384
+## bridge.aws.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384
 
 ## Ciphers for TLS PSK.
 ## Note that 'listener.ssl.external.ciphers' and 'listener.ssl.external.psk_ciphers' cannot
 ## be configured at the same time.
 ## See 'https://tools.ietf.org/html/rfc4279#section-2'.
-#bridge.aws.psk_ciphers = PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA
+## bridge.aws.psk_ciphers = PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA
 
 ## Ping interval of a down bridge.
 ##

+ 0 - 1
include/types.hrl

@@ -19,4 +19,3 @@
 -type(ok_or_error(Reason) :: ok | {error, Reason}).
 
 -type(ok_or_error(Value, Reason) :: {ok, Value} | {error, Reason}).
-

+ 33 - 16
priv/emqx.schema

@@ -270,8 +270,7 @@ end}.
     X when is_integer(X) -> cuttlefish_util:ceiling(X / 1024); %% Bytes to Kilobytes;
     _ -> undefined
   end
- end
-}.
+ end}.
 
 {validator, "zdbbl_range", "must be between 1KB and 2097151KB",
  fun(ZDBBL) ->
@@ -574,6 +573,11 @@ end}.
   {datatype, {enum, [ignore, disconnect]}}
 ]}.
 
+%% @doc time interval to clean flapping records
+{mapping, "flapping_clean_interval", "emqx.flapping_clean_interval", [
+  {datatype, {duration, ms}}
+]}.
+
 {validator, "range:gt_0", "must greater than 0",
   fun(X) -> X > 0 end
 }.
@@ -814,6 +818,18 @@ end}.
   {datatype, {enum, [true, false]}}
 ]}.
 
+{mapping, "zone.$name.enable_flapping_detect", "emqx.zones", [
+  {datatype, flag}
+]}.
+
+{mapping, "zone.$name.flapping_threshold", "emqx.zones", [
+  {datatype, string}
+]}.
+
+{mapping, "zone.$name.flapping_expiry_interval", "emqx.zones", [
+  {datatype, {duration, s}}
+]}.
+
 %% @doc Force connection/session process GC after this number of
 %% messages | bytes passed through.
 %% Numbers delimited by `|'. Zero or negative is to disable.
@@ -845,6 +861,15 @@ end}.
 {translation, "emqx.zones", fun(Conf) ->
   Mapping = fun("retain_available", Val) ->
                     {mqtt_retain_available, Val};
+               ("flapping_threshold", Val) ->
+                    [Limit, Duration] = string:tokens(Val, ", "),
+                    FlappingThreshold = case cuttlefish_duration:parse(Duration, s) of
+                                            Min when is_integer(Min) ->
+                                                {list_to_integer(Limit), Min};
+                                           {error, Reason} ->
+                                                error(Reason)
+                                        end,
+                    {flapping_threshold, FlappingThreshold};
                ("wildcard_subscription", Val) ->
                     {mqtt_wildcard_subscription, Val};
                ("shared_subscription", Val) ->
@@ -2053,11 +2078,8 @@ end}.
 ]}.
 
 {translation, "emqx.sysmon", fun(Conf) ->
-  [{long_gc, cuttlefish:conf_get("sysmon.long_gc", Conf)},
-   {long_schedule, cuttlefish:conf_get("sysmon.long_schedule", Conf)},
-   {large_heap, cuttlefish:conf_get("sysmon.large_heap", Conf)},
-   {busy_port, cuttlefish:conf_get("sysmon.busy_port", Conf)},
-   {busy_dist_port, cuttlefish:conf_get("sysmon.busy_dist_port", Conf)}]
+    Configs = cuttlefish_variable:filter_by_prefix("sysmon", Conf),
+    [{list_to_atom(Name), Value} || {[_, Name], Value} <- Configs]
 end}.
 
 %%--------------------------------------------------------------------
@@ -2095,12 +2117,8 @@ end}.
 ]}.
 
 {translation, "emqx.os_mon", fun(Conf) ->
-  [{cpu_check_interval, cuttlefish:conf_get("os_mon.cpu_check_interval", Conf)},
-   {cpu_high_watermark, cuttlefish:conf_get("os_mon.cpu_high_watermark", Conf)},
-   {cpu_low_watermark, cuttlefish:conf_get("os_mon.cpu_low_watermark", Conf)},
-   {mem_check_interval, cuttlefish:conf_get("os_mon.mem_check_interval", Conf)},
-   {sysmem_high_watermark, cuttlefish:conf_get("os_mon.sysmem_high_watermark", Conf)},
-   {procmem_high_watermark, cuttlefish:conf_get("os_mon.procmem_high_watermark", Conf)}]
+    Configs = cuttlefish_variable:filter_by_prefix("os_mon", Conf),
+    [{list_to_atom(Name), Value} || {[_, Name], Value} <- Configs]
 end}.
 
 %%--------------------------------------------------------------------
@@ -2122,7 +2140,6 @@ end}.
 ]}.
 
 {translation, "emqx.vm_mon", fun(Conf) ->
-  [{check_interval, cuttlefish:conf_get("vm_mon.check_interval", Conf)},
-   {process_high_watermark, cuttlefish:conf_get("vm_mon.process_high_watermark", Conf)},
-   {process_low_watermark, cuttlefish:conf_get("vm_mon.process_low_watermark", Conf)}]
+    Configs = cuttlefish_variable:filter_by_prefix("vm_mon", Conf),
+    [{list_to_atom(Name), Value} || {[_, Name], Value} <- Configs]
 end}.

+ 3 - 4
src/emqx_banned.erl

@@ -70,13 +70,13 @@ check(#{client_id := ClientId, username := Username, peername := {IPAddr, _}}) -
         orelse ets:member(?TAB, {username, Username})
             orelse ets:member(?TAB, {ipaddr, IPAddr}).
 
--spec(add(#banned{}) -> ok).
+-spec(add(emqx_types:banned()) -> ok).
 add(Banned) when is_record(Banned, banned) ->
     mnesia:dirty_write(?TAB, Banned).
 
 -spec(delete({client_id, emqx_types:client_id()}
-           | {username, emqx_types:username()}
-           | {peername, emqx_types:peername()}) -> ok).
+             | {username, emqx_types:username()}
+             | {peername, emqx_types:peername()}) -> ok).
 delete(Key) ->
     mnesia:dirty_delete(?TAB, Key).
 
@@ -127,4 +127,3 @@ expire_banned_items(Now) ->
               mnesia:delete_object(?TAB, B, sticky_write);
          (_, _Acc) -> ok
       end, ok, ?TAB).
-

+ 3 - 1
src/emqx_bridge_mqtt.erl

@@ -56,7 +56,9 @@ start(Config = #{address := Address}) ->
     ClientConfig = Config#{msg_handler => Handlers,
                            owner => AckCollector,
                            host => Host,
-                           port => Port},
+                           port => Port,
+                           bridge_mode => true
+                          },
     case emqx_client:start_link(ClientConfig) of
         {ok, Pid} ->
             case emqx_client:connect(Pid) of

+ 1 - 1
src/emqx_client.erl

@@ -88,7 +88,7 @@
              ]).
 
 %% Default timeout
--define(DEFAULT_KEEPALIVE,       60000).
+-define(DEFAULT_KEEPALIVE,       60).
 -define(DEFAULT_ACK_TIMEOUT,     30000).
 -define(DEFAULT_CONNECT_TIMEOUT, 60000).
 

+ 11 - 2
src/emqx_cm_sup.erl

@@ -30,11 +30,20 @@ init([]) ->
                shutdown => 1000,
                type => worker,
                modules => [emqx_banned]},
+    FlappingOption = emqx_config:get_env(flapping_clean_interval, 3600000),
+    Flapping = #{id => flapping,
+                 start => {emqx_flapping, start_link, [FlappingOption]},
+                 restart => permanent,
+                 shutdown => 1000,
+                 type => worker,
+                 modules => [emqx_flapping]},
     Manager = #{id => manager,
                 start => {emqx_cm, start_link, []},
                 restart => permanent,
                 shutdown => 2000,
                 type => worker,
                 modules => [emqx_cm]},
-    {ok, {{one_for_one, 10, 100}, [Banned, Manager]}}.
-
+    SupFlags = #{strategy => one_for_one,
+                 intensity => 100,
+                 period => 10},
+    {ok, {SupFlags, [Banned, Manager, Flapping]}}.

+ 0 - 2
src/emqx_config.erl

@@ -19,7 +19,6 @@
 %% 1. Store in mnesia database?
 %% 2. Store in dets?
 %% 3. Store in data/app.config?
-%%
 
 -module(emqx_config).
 
@@ -138,4 +137,3 @@ read_(_App) -> error(no_impl).
     %         end, [], Configs),
     %         RequiredCfg ++ OptionalCfg
     % end.
-

+ 4 - 4
src/emqx_connection.erl

@@ -242,10 +242,10 @@ connected(info, {deliver, PubOrAck}, State = #state{proto_state = ProtoState}) -
 connected(info, {keepalive, start, Interval},
           State = #state{transport = Transport, socket = Socket}) ->
     StatFun = fun() ->
-                case Transport:getstat(Socket, [recv_oct]) of
-                    {ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct};
-                    Error -> Error
-                end
+                  case Transport:getstat(Socket, [recv_oct]) of
+                      {ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct};
+                      Error -> Error
+                  end
               end,
     case emqx_keepalive:start(StatFun, Interval, {keepalive, check}) of
         {ok, KeepAlive} ->

+ 130 - 50
src/emqx_flapping.erl

@@ -12,70 +12,150 @@
 %% See the License for the specific language governing permissions and
 %% limitations under the License.
 
-%% @doc TODO:
-%% 1. Flapping Detection
-%% 2. Conflict Detection?
 -module(emqx_flapping).
 
-%% Use ets:update_counter???
+-include("emqx.hrl").
+-include("logger.hrl").
+-include("types.hrl").
 
--behaviour(gen_server).
+-behaviour(gen_statem).
 
--export([start_link/0]).
+-export([start_link/1]).
 
--export([ is_banned/1
-        , banned/1
-        ]).
+%% This module is used to garbage clean the flapping records
 
-%% gen_server callbacks
--export([ init/1
-        , handle_call/3
-        , handle_cast/2
-        , handle_info/2
-        , terminate/2
-        , code_change/3
+%% gen_statem callbacks
+-export([ terminate/3
+        , code_change/4
+        , init/1
+        , initialized/3
+        , callback_mode/0
         ]).
 
--define(SERVER, ?MODULE).
-
--record(state, {}).
-
--spec(start_link() -> {ok, pid()} | ignore | {error, any()}).
-start_link() ->
-    gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
-
-is_banned(ClientId) ->
-    ets:member(banned, ClientId).
-
-banned(ClientId) ->
-    ets:insert(banned, {ClientId, os:timestamp()}).
+-define(FLAPPING_TAB, ?MODULE).
+
+-export([check/3]).
+
+-record(flapping,
+        { client_id   :: binary()
+        , check_count :: integer()
+        , timestamp   :: integer()
+        }).
+
+-type(flapping_record() :: #flapping{}).
+-type(flapping_state() :: flapping | ok).
+
+
+%% @doc This function is used to initialize flapping records
+%% the expiry time unit is minutes.
+-spec(init_flapping(ClientId :: binary(), Interval :: integer()) -> flapping_record()).
+init_flapping(ClientId, Interval) ->
+    #flapping{ client_id = ClientId
+             , check_count = 1
+             , timestamp = emqx_time:now_secs() + Interval
+             }.
+
+%% @doc This function is used to initialize flapping records
+%% the expiry time unit is minutes.
+-spec(check( Action :: atom()
+           , ClientId :: binary()
+           , Threshold :: {integer(), integer()})
+      -> flapping_state()).
+check(Action, ClientId, Threshold = {_TimesThreshold, TimeInterval}) ->
+    check(Action, ClientId, Threshold, init_flapping(ClientId, TimeInterval)).
+
+-spec(check( Action :: atom()
+           , ClientId :: binary()
+           , Threshold :: {integer(), integer()}
+           , InitFlapping :: flapping_record())
+      -> flapping_state()).
+check(Action, ClientId, Threshold, InitFlapping) ->
+    try ets:update_counter(?FLAPPING_TAB, ClientId, {_Pos = #flapping.check_count, 1}) of
+        CheckCount ->
+            case ets:lookup(?FLAPPING_TAB, ClientId) of
+                [Flapping] ->
+                    check_flapping(Action, CheckCount, Threshold, Flapping);
+                _Flapping ->
+                    ok
+            end
+    catch
+        error:badarg ->
+            ets:insert_new(?FLAPPING_TAB, InitFlapping),
+            ok
+    end.
+
+-spec(check_flapping( Action :: atom()
+                    , CheckTimes :: integer()
+                    , Threshold :: {integer(), integer()}
+                    , InitFlapping :: flapping_record())
+      -> flapping_state()).
+check_flapping(Action, CheckTimes, _Threshold = {TimesThreshold, TimeInterval},
+               Flapping = #flapping{ client_id = ClientId
+                                   , timestamp = Timestamp }) ->
+    case emqx_time:now_secs() of
+        NowTimestamp when NowTimestamp =< Timestamp,
+                          CheckTimes > TimesThreshold ->
+            ets:delete(?FLAPPING_TAB, ClientId),
+            flapping;
+        NowTimestamp when NowTimestamp > Timestamp,
+                          Action =:= disconnect ->
+            ets:delete(?FLAPPING_TAB, ClientId),
+            ok;
+        NowTimestamp ->
+            NewFlapping = Flapping#flapping{timestamp = NowTimestamp + TimeInterval},
+            ets:insert(?FLAPPING_TAB, NewFlapping),
+            ok
+    end.
 
 %%--------------------------------------------------------------------
-%% gen_server callbacks
+%% gen_statem callbacks
 %%--------------------------------------------------------------------
-
-init([]) ->
-    %% ets:new(banned, [public, ordered_set, named_table]),
-    {ok, #state{}}.
-
-handle_call(_Request, _From, State) ->
-    Reply = ok,
-    {reply, Reply, State}.
-
-handle_cast(_Msg, State) ->
-    {noreply, State}.
-
-handle_info(_Info, State) ->
-    {noreply, State}.
-
-terminate(_Reason, _State) ->
+-spec(start_link(TimerInterval :: integer()) -> startlink_ret()).
+start_link(TimerInterval) ->
+    gen_statem:start_link({local, ?MODULE}, ?MODULE, [TimerInterval], []).
+
+init([TimerInterval]) ->
+    TabOpts = [ public
+              , set
+              , {keypos, 2}
+              , {write_concurrency, true}
+              , {read_concurrency, true}],
+    ok = emqx_tables:new(?FLAPPING_TAB, TabOpts),
+    {ok, initialized, #{timer_interval => TimerInterval}}.
+
+callback_mode() -> [state_functions, state_enter].
+
+initialized(enter, _OldState, #{timer_interval := Time}) ->
+    Action = {state_timeout, Time, clean_expired_records},
+    {keep_state_and_data, Action};
+initialized(state_timeout, clean_expired_records, #{}) ->
+    clean_expired_records(),
+    repeat_state_and_data.
+
+code_change(_Vsn, State, Data, _Extra) ->
+    {ok, State, Data}.
+
+terminate(_Reason, _StateName, _State) ->
+    emqx_tables:delete(?FLAPPING_TAB),
     ok.
 
-code_change(_OldVsn, State, _Extra) ->
-    {ok, State}.
-
 %%--------------------------------------------------------------------
 %% Internal functions
 %%--------------------------------------------------------------------
 
-
+%% @doc clean expired records in ets
+clean_expired_records() ->
+    Records = ets:tab2list(?FLAPPING_TAB),
+    traverse_records(Records).
+
+traverse_records([]) ->
+    ok;
+traverse_records([#flapping{client_id = ClientId,
+                            timestamp = Timestamp} | LeftRecords]) ->
+    case emqx_time:now_secs() > Timestamp of
+        true ->
+            ets:delete(?FLAPPING_TAB, ClientId);
+        false ->
+            true
+    end,
+    traverse_records(LeftRecords).

+ 10 - 10
src/emqx_frame.erl

@@ -141,16 +141,16 @@ parse_packet(#mqtt_packet_header{type = ?CONNECT}, FrameBin, _Options) ->
 
     {Properties, Rest3} = parse_properties(Rest2, ProtoVer),
     {ClientId, Rest4} = parse_utf8_string(Rest3),
-    ConnPacket = #mqtt_packet_connect{proto_name   = ProtoName,
-                                      proto_ver    = ProtoVer,
-                                      is_bridge    = (BridgeTag =:= 8),
-                                      clean_start  = bool(CleanStart),
-                                      will_flag    = bool(WillFlag),
-                                      will_qos     = WillQoS,
-                                      will_retain  = bool(WillRetain),
-                                      keepalive    = KeepAlive,
-                                      properties   = Properties,
-                                      client_id    = ClientId},
+    ConnPacket = #mqtt_packet_connect{proto_name  = ProtoName,
+                                      proto_ver   = ProtoVer,
+                                      is_bridge   = (BridgeTag =:= 8),
+                                      clean_start = bool(CleanStart),
+                                      will_flag   = bool(WillFlag),
+                                      will_qos    = WillQoS,
+                                      will_retain = bool(WillRetain),
+                                      keepalive   = KeepAlive,
+                                      properties  = Properties,
+                                      client_id   = ClientId},
     {ConnPacket1, Rest5} = parse_will_message(ConnPacket, Rest4),
     {Username, Rest6} = parse_utf8_string(Rest5, bool(UsernameFlag)),
     {Passsword, <<>>} = parse_utf8_string(Rest6, bool(PasswordFlag)),

+ 58 - 30
src/emqx_protocol.erl

@@ -60,6 +60,7 @@
           is_bridge,
           enable_ban,
           enable_acl,
+          enable_flapping_detect,
           acl_deny_action,
           recv_stats,
           send_stats,
@@ -90,31 +91,32 @@ init(SocketOpts = #{ peername := Peername
                    , peercert := Peercert
                    , sendfun := SendFun}, Options)  ->
     Zone = proplists:get_value(zone, Options),
-    #pstate{zone                = Zone,
-            sendfun             = SendFun,
-            peername            = Peername,
-            peercert            = Peercert,
-            proto_ver           = ?MQTT_PROTO_V4,
-            proto_name          = <<"MQTT">>,
-            client_id           = <<>>,
-            is_assigned         = false,
-            conn_pid            = self(),
-            username            = init_username(Peercert, Options),
-            clean_start         = false,
-            topic_aliases       = #{},
-            packet_size         = emqx_zone:get_env(Zone, max_packet_size),
-            is_bridge           = false,
-            enable_ban          = emqx_zone:get_env(Zone, enable_ban, false),
-            enable_acl          = emqx_zone:get_env(Zone, enable_acl),
-            acl_deny_action     = emqx_zone:get_env(Zone, acl_deny_action, ignore),
-            recv_stats          = #{msg => 0, pkt => 0},
-            send_stats          = #{msg => 0, pkt => 0},
-            connected           = false,
-            ignore_loop         = emqx_config:get_env(mqtt_ignore_loop_deliver, false),
-            topic_alias_maximum = #{to_client => 0, from_client => 0},
-            conn_mod            = maps:get(conn_mod, SocketOpts, undefined),
-            credentials         = #{},
-            ws_cookie           = maps:get(ws_cookie, SocketOpts, undefined)}.
+    #pstate{zone                   = Zone,
+            sendfun                = SendFun,
+            peername               = Peername,
+            peercert               = Peercert,
+            proto_ver              = ?MQTT_PROTO_V4,
+            proto_name             = <<"MQTT">>,
+            client_id              = <<>>,
+            is_assigned            = false,
+            conn_pid               = self(),
+            username               = init_username(Peercert, Options),
+            clean_start            = false,
+            topic_aliases          = #{},
+            packet_size            = emqx_zone:get_env(Zone, max_packet_size),
+            is_bridge              = false,
+            enable_ban             = emqx_zone:get_env(Zone, enable_ban, false),
+            enable_acl             = emqx_zone:get_env(Zone, enable_acl),
+            enable_flapping_detect = emqx_zone:get_env(Zone, enable_flapping_detect, false),
+            acl_deny_action        = emqx_zone:get_env(Zone, acl_deny_action, ignore),
+            recv_stats             = #{msg => 0, pkt => 0},
+            send_stats             = #{msg => 0, pkt => 0},
+            connected              = false,
+            ignore_loop            = emqx_config:get_env(mqtt_ignore_loop_deliver, false),
+            topic_alias_maximum    = #{to_client => 0, from_client => 0},
+            conn_mod               = maps:get(conn_mod, SocketOpts, undefined),
+            credentials            = #{},
+            ws_cookie              = maps:get(ws_cookie, SocketOpts, undefined)}.
 
 init_username(Peercert, Options) ->
     case proplists:get_value(peer_cert_as_username, Options) of
@@ -766,6 +768,7 @@ make_will_msg(#mqtt_packet_connect{proto_ver   = ProtoVer,
 check_connect(Packet, PState) ->
     run_check_steps([fun check_proto_ver/2,
                      fun check_client_id/2,
+                     fun check_flapping/2,
                      fun check_banned/2,
                      fun check_will_topic/2], Packet, PState).
 
@@ -798,6 +801,9 @@ check_client_id(#mqtt_packet_connect{client_id = ClientId}, #pstate{zone = Zone}
         false -> {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID}
     end.
 
+check_flapping(#mqtt_packet_connect{}, PState) ->
+    do_flapping_detect(connect, PState).
+
 check_banned(_ConnPkt, #pstate{enable_ban = false}) ->
     ok;
 check_banned(#mqtt_packet_connect{client_id = ClientId, username = Username},
@@ -896,14 +902,16 @@ inc_stats(Type, Stats = #{pkt := PktCnt, msg := MsgCnt}) ->
 
 terminate(_Reason, #pstate{client_id = undefined}) ->
     ok;
-terminate(_Reason, #pstate{connected = false}) ->
-    ok;
-terminate(conflict, _PState) ->
+terminate(_Reason, PState = #pstate{connected = false}) ->
+    do_flapping_detect(disconnect, PState),
     ok;
-terminate(discard, _PState) ->
+terminate(Reason, PState) when Reason =:= conflict;
+                               Reason =:= discard ->
+    do_flapping_detect(disconnect, PState),
     ok;
 
-terminate(Reason, #pstate{credentials = Credentials}) ->
+terminate(Reason, PState = #pstate{credentials = Credentials}) ->
+    do_flapping_detect(disconnect, PState),
     ?LOG(info, "[Protocol] Shutdown for ~p", [Reason]),
     ok = emqx_hooks:run('client.disconnected', [Credentials, Reason]).
 
@@ -932,6 +940,26 @@ flag(true)  -> 1.
 %%------------------------------------------------------------------------------
 %% Execute actions in case acl deny
 
+do_flapping_detect(Action, #pstate{zone = Zone,
+                                   client_id = ClientId,
+                                   enable_flapping_detect = true}) ->
+    ExpiryInterval = emqx_zone:get_env(Zone, flapping_expiry_interval, 3600000),
+    Threshold = emqx_zone:get_env(Zone, flapping_threshold, 20),
+    Until = erlang:system_time(second) + ExpiryInterval,
+    case emqx_flapping:check(Action, ClientId, Threshold) of
+        flapping ->
+            emqx_banned:add(#banned{who = {client_id, ClientId},
+                                    reason = <<"flapping">>,
+                                    by = <<"flapping_checker">>,
+                                    until = Until
+                                   }),
+            ok;
+        _Other ->
+            ok
+    end;
+do_flapping_detect(_Action, _PState) ->
+    ok.
+
 do_acl_deny_action(?PUBLISH_PACKET(?QOS_0, _Topic, _PacketId, _Payload),
                    ?RC_NOT_AUTHORIZED, PState = #pstate{proto_ver = ProtoVer,
                                                         acl_deny_action = disconnect}) ->

+ 11 - 2
src/emqx_tables.erl

@@ -14,7 +14,7 @@
 
 -module(emqx_tables).
 
--export([new/2]).
+-export([new/2, delete/1]).
 
 -export([ lookup_value/2
         , lookup_value/3
@@ -30,6 +30,16 @@ new(Tab, Opts) ->
         Tab -> ok
     end.
 
+-spec(delete(atom()) -> ok).
+delete(Tab) ->
+    case ets:info(Tab, name) of
+        undefined ->
+            ok;
+        Tab ->
+            ets:delete(Tab),
+            ok
+    end.
+
 %% KV lookup
 -spec(lookup_value(atom(), term()) -> any()).
 lookup_value(Tab, Key) ->
@@ -42,4 +52,3 @@ lookup_value(Tab, Key, Def) ->
     catch
         error:badarg -> Def
     end.
-

+ 2 - 1
src/emqx_types.erl

@@ -53,6 +53,7 @@
 
 -export_type([ alarm/0
              , plugin/0
+             , banned/0
              , command/0
              ]).
 
@@ -91,6 +92,7 @@
 -type(topic_table() :: [{topic(), subopts()}]).
 -type(payload() :: binary() | iodata()).
 -type(message() :: #message{}).
+-type(banned() :: #banned{}).
 -type(delivery() :: #delivery{}).
 -type(deliver_results() :: [{route, node(), topic()} |
                             {dispatch, topic(), pos_integer()}]).
@@ -98,4 +100,3 @@
 -type(alarm() :: #alarm{}).
 -type(plugin() :: #plugin{}).
 -type(command() :: #command{}).
-

+ 7 - 0
test/emqx_access_SUITE.erl

@@ -62,6 +62,13 @@ groups() ->
        [compile_rule,
         match_rule]}].
 
+init_per_suite(Config) ->
+    emqx_ct_broker_helpers:run_setup_steps(),
+    Config.
+
+end_per_suite(_Config) ->
+    emqx_ct_broker_helpers:run_teadown_steps().
+
 init_per_group(Group, Config) when  Group =:= access_control;
                                     Group =:= access_control_cache_mode ->
     prepare_config(Group),

+ 27 - 26
test/emqx_ct_broker_helpers.erl

@@ -62,6 +62,7 @@ run_setup_steps(Config) ->
     NewConfig = generate_config(),
     lists:foreach(fun set_app_env/1, NewConfig),
     set_bridge_env(),
+
     {ok, _} = application:ensure_all_started(?APP),
     set_log_level(Config),
     Config.
@@ -109,32 +110,32 @@ set_bridge_env() ->
 change_opts(SslType) ->
     {ok, Listeners} = application:get_env(?APP, listeners),
     NewListeners =
-    lists:foldl(fun({Protocol, Port, Opts} = Listener, Acc) ->
-    case Protocol of
-    ssl ->
-            SslOpts = proplists:get_value(ssl_options, Opts),
-            Keyfile = local_path(["etc/certs", "key.pem"]),
-            Certfile = local_path(["etc/certs", "cert.pem"]),
-            TupleList1 = lists:keyreplace(keyfile, 1, SslOpts, {keyfile, Keyfile}),
-            TupleList2 = lists:keyreplace(certfile, 1, TupleList1, {certfile, Certfile}),
-            TupleList3 =
-            case SslType of
-            ssl_twoway->
-                CAfile = local_path(["etc", proplists:get_value(cacertfile, ?MQTT_SSL_TWOWAY)]),
-                MutSslList = lists:keyreplace(cacertfile, 1, ?MQTT_SSL_TWOWAY, {cacertfile, CAfile}),
-                lists:merge(TupleList2, MutSslList);
-            _ ->
-                lists:filter(fun ({cacertfile, _}) -> false;
-                                 ({verify, _}) -> false;
-                                 ({fail_if_no_peer_cert, _}) -> false;
-                                 (_) -> true
-                             end, TupleList2)
-            end,
-            [{Protocol, Port, lists:keyreplace(ssl_options, 1, Opts, {ssl_options, TupleList3})} | Acc];
-        _ ->
-            [Listener | Acc]
-    end
-    end, [], Listeners),
+        lists:foldl(fun({Protocol, Port, Opts} = Listener, Acc) ->
+                            case Protocol of
+                                ssl ->
+                                    SslOpts = proplists:get_value(ssl_options, Opts),
+                                    Keyfile = local_path(["etc/certs", "key.pem"]),
+                                    Certfile = local_path(["etc/certs", "cert.pem"]),
+                                    TupleList1 = lists:keyreplace(keyfile, 1, SslOpts, {keyfile, Keyfile}),
+                                    TupleList2 = lists:keyreplace(certfile, 1, TupleList1, {certfile, Certfile}),
+                                    TupleList3 =
+                                        case SslType of
+                                            ssl_twoway->
+                                                CAfile = local_path(["etc", proplists:get_value(cacertfile, ?MQTT_SSL_TWOWAY)]),
+                                                MutSslList = lists:keyreplace(cacertfile, 1, ?MQTT_SSL_TWOWAY, {cacertfile, CAfile}),
+                                                lists:merge(TupleList2, MutSslList);
+                                            _ ->
+                                                lists:filter(fun ({cacertfile, _}) -> false;
+                                                                 ({verify, _}) -> false;
+                                                                 ({fail_if_no_peer_cert, _}) -> false;
+                                                                 (_) -> true
+                                                             end, TupleList2)
+                                        end,
+                                    [{Protocol, Port, lists:keyreplace(ssl_options, 1, Opts, {ssl_options, TupleList3})} | Acc];
+                                _ ->
+                                    [Listener | Acc]
+                            end
+                    end, [], Listeners),
     application:set_env(?APP, listeners, NewListeners).
 
 client_ssl_twoway() ->

+ 60 - 0
test/emqx_flapping_SUITE.erl

@@ -0,0 +1,60 @@
+%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+
+-module(emqx_flapping_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include("emqx.hrl").
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+all() ->
+    [t_flapping].
+
+init_per_suite(Config) ->
+    emqx_ct_broker_helpers:run_setup_steps(),
+    prepare_for_test(),
+    Config.
+
+end_per_suite(_Config) ->
+    emqx_ct_broker_helpers:run_teardown_steps().
+
+t_flapping(_Config) ->
+    process_flag(trap_exit, true),
+    flapping_connect(5),
+    {ok, C} = emqx_client:start_link([{client_id, <<"Client">>}]),
+    {error, _} = emqx_client:connect(C),
+    receive
+        {'EXIT', Client, _Reason} ->
+            ct:log("receive exit signal, Client: ~p", [Client])
+    after 1000 ->
+            ct:log("timeout")
+    end.
+
+
+flapping_connect(Times) ->
+    [flapping_connect() || _ <- lists:seq(1, Times)].
+
+flapping_connect() ->
+    {ok, C} = emqx_client:start_link([{client_id, <<"Client">>}]),
+    {ok, _} = emqx_client:connect(C),
+    ok = emqx_client:disconnect(C).
+
+prepare_for_test() ->
+    emqx_zone:set_env(external, enable_flapping_detect, true),
+    emqx_zone:set_env(external, flapping_threshold, {10, 60}),
+    emqx_zone:set_env(external, flapping_expiry_interval, 3600).

+ 3 - 1
test/emqx_tables_SUITE.erl

@@ -23,4 +23,6 @@ t_new(_) ->
     ok = emqx_tables:new(test_table, [{read_concurrency, true}]),
     ets:insert(test_table, {key, 100}),
     ok = emqx_tables:new(test_table, [{read_concurrency, true}]),
-    100 = ets:lookup_element(test_table, key, 2).
+    100 = ets:lookup_element(test_table, key, 2),
+    ok = emqx_tables:delete(test_table),
+    ok = emqx_tables:delete(test_table).

+ 0 - 1
test/emqx_zone_SUITE.erl

@@ -35,4 +35,3 @@ t_set_get_env(_) ->
     emqx_zone:force_reload(),
     ?assertEqual(val, emqx_zone:get_env(zone1, key)),
     emqx_zone:stop().
-