Forráskód Böngészése

refactor: more rigorous checking of flapping to improve stability of the system

port: https://github.com/emqx/emqx/pull/9045
JianBo He 3 éve
szülő
commit
56000cbf3e
2 módosított fájl, 47 hozzáadás és 14 törlés
  1. 11 5
      apps/emqx/src/emqx_channel.erl
  2. 36 9
      apps/emqx/test/emqx_channel_SUITE.erl

+ 11 - 5
apps/emqx/src/emqx_channel.erl

@@ -345,7 +345,8 @@ handle_in(?CONNECT_PACKET(ConnPkt) = Packet, Channel) ->
                 fun check_connect/2,
                 fun enrich_client/2,
                 fun set_log_meta/2,
-                fun check_banned/2
+                fun check_banned/2,
+                fun count_flapping_event/2
             ],
             ConnPkt,
             Channel#channel{conn_state = connecting}
@@ -1260,14 +1261,11 @@ handle_info(
     {sock_closed, Reason},
     Channel =
         #channel{
-            conn_state = ConnState,
-            clientinfo = ClientInfo = #{zone := Zone}
+            conn_state = ConnState
         }
 ) when
     ConnState =:= connected orelse ConnState =:= reauthenticating
 ->
-    emqx_config:get_zone_conf(Zone, [flapping_detect, enable]) andalso
-        emqx_flapping:detect(ClientInfo),
     Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(Channel)),
     case maybe_shutdown(Reason, Channel1) of
         {ok, Channel2} -> {ok, {event, disconnected}, Channel2};
@@ -1636,6 +1634,14 @@ check_banned(_ConnPkt, #channel{clientinfo = ClientInfo}) ->
         false -> ok
     end.
 
+%%--------------------------------------------------------------------
+%% Flapping
+
+count_flapping_event(_ConnPkt, Channel = #channel{clientinfo = ClientInfo = #{zone := Zone}}) ->
+    emqx_config:get_zone_conf(Zone, [flapping_detect, enable]) andalso
+        emqx_flapping:detect(ClientInfo),
+    {ok, Channel}.
+
 %%--------------------------------------------------------------------
 %% Authenticate
 

+ 36 - 9
apps/emqx/test/emqx_channel_SUITE.erl

@@ -207,14 +207,6 @@ init_per_suite(Config) ->
     ok = meck:new(emqx_cm, [passthrough, no_history, no_link]),
     ok = meck:expect(emqx_cm, mark_channel_connected, fun(_) -> ok end),
     ok = meck:expect(emqx_cm, mark_channel_disconnected, fun(_) -> ok end),
-    %% Access Control Meck
-    ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]),
-    ok = meck:expect(
-        emqx_access_control,
-        authenticate,
-        fun(_) -> {ok, #{is_superuser => false}} end
-    ),
-    ok = meck:expect(emqx_access_control, authorize, fun(_, _, _) -> allow end),
     %% Broker Meck
     ok = meck:new(emqx_broker, [passthrough, no_history, no_link]),
     %% Hooks Meck
@@ -234,7 +226,6 @@ init_per_suite(Config) ->
 
 end_per_suite(_Config) ->
     meck:unload([
-        emqx_access_control,
         emqx_metrics,
         emqx_session,
         emqx_broker,
@@ -244,11 +235,21 @@ end_per_suite(_Config) ->
     ]).
 
 init_per_testcase(_TestCase, Config) ->
+    %% Access Control Meck
+    ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]),
+    ok = meck:expect(
+        emqx_access_control,
+        authenticate,
+        fun(_) -> {ok, #{is_superuser => false}} end
+    ),
+    ok = meck:expect(emqx_access_control, authorize, fun(_, _, _) -> allow end),
+    %% Set confs
     OldConf = set_test_listener_confs(),
     emqx_common_test_helpers:start_apps([]),
     [{config, OldConf} | Config].
 
 end_per_testcase(_TestCase, Config) ->
+    meck:unload([emqx_access_control]),
     emqx_config:put(?config(config, Config)),
     emqx_common_test_helpers:stop_apps([]),
     Config.
@@ -1115,6 +1116,32 @@ t_ws_cookie_init(_) ->
     ),
     ?assertMatch(#{ws_cookie := WsCookie}, emqx_channel:info(clientinfo, Channel)).
 
+%%--------------------------------------------------------------------
+%% Test cases for other mechnisms
+%%--------------------------------------------------------------------
+
+t_flapping_detect(_) ->
+    emqx_config:put_zone_conf(default, [flapping_detect, enable], true),
+    Parent = self(),
+    ok = meck:expect(
+        emqx_cm,
+        open_session,
+        fun(true, _ClientInfo, _ConnInfo) ->
+            {ok, #{session => session(), present => false}}
+        end
+    ),
+    ok = meck:expect(emqx_access_control, authenticate, fun(_) -> {error, not_authorized} end),
+    ok = meck:expect(emqx_flapping, detect, fun(_) -> Parent ! flapping_detect end),
+    IdleChannel = channel(#{conn_state => idle}),
+    {shutdown, not_authorized, _ConnAck, _Channel} =
+        emqx_channel:handle_in(?CONNECT_PACKET(connpkt()), IdleChannel),
+    receive
+        flapping_detect -> ok
+    after 2000 ->
+        ?assert(false, "Flapping detect should be exected in connecting progress")
+    end,
+    meck:unload([emqx_flapping]).
+
 %%--------------------------------------------------------------------
 %% Helper functions
 %%--------------------------------------------------------------------