소스 검색

fix: Revert "feat(banned): clean retained/delayed data when client is banned"

This reverts commit 69701ff578adf2085f32cd4af484e725ea704123.
firest 3 년 전
부모
커밋
ebaba0c2b1

+ 0 - 13
apps/emqx/i18n/emqx_schema_i18n.conf

@@ -494,19 +494,6 @@ emqx_schema {
     }
   }
 
-  flapping_detect_clean_when_banned {
-    desc {
-        en: "Clean retained/delayed messages when client is banned.\n"
-            "Note: This may be expensive and only supports users banned by clientid."
-        zh: "当客户端被禁时删除其保留、延迟消息"
-            "注意: 这个操作开销可能较大,且只支持通过 clientid 封禁的用户数据。"
-    }
-    label: {
-        en: "Clean when banned"
-        zh: "被禁时清理消息"
-    }
-  }
-
   persistent_session_store_enabled {
     desc {
         en: "Use the database to store information about persistent sessions.\n"

+ 0 - 1
apps/emqx/priv/bpapi.versions

@@ -9,7 +9,6 @@
 {emqx_conf,2}.
 {emqx_dashboard,1}.
 {emqx_delayed,1}.
-{emqx_delayed,2}.
 {emqx_exhook,1}.
 {emqx_gateway_api_listeners,1}.
 {emqx_gateway_cm,1}.

+ 12 - 42
apps/emqx/src/emqx_banned.erl

@@ -32,13 +32,11 @@
 -export([
     check/1,
     create/1,
-    create/2,
     look_up/1,
     delete/1,
     info/1,
     format/1,
-    parse/1,
-    parse_opts/1
+    parse/1
 ]).
 
 %% gen_server callbacks
@@ -65,13 +63,6 @@
 -compile(nowarn_export_all).
 -endif.
 
--type banned_opts() :: #{
-    clean => boolean(),
-    atom() => term()
-}.
-
--export_type([banned_opts/0]).
-
 %%--------------------------------------------------------------------
 %% Mnesia bootstrap
 %%--------------------------------------------------------------------
@@ -150,11 +141,6 @@ parse(Params) ->
                     {error, ErrorReason}
             end
     end.
-
-parse_opts(Params) ->
-    Clean = maps:get(<<"clean">>, Params, false),
-    #{clean => Clean}.
-
 pares_who(#{as := As, who := Who}) ->
     pares_who(#{<<"as">> => As, <<"who">> => Who});
 pares_who(#{<<"as">> := peerhost, <<"who">> := Peerhost0}) ->
@@ -176,15 +162,13 @@ to_rfc3339(Timestamp) ->
 
 -spec create(emqx_types:banned() | map()) ->
     {ok, emqx_types:banned()} | {error, {already_exist, emqx_types:banned()}}.
-create(
-    #{
-        who := Who,
-        by := By,
-        reason := Reason,
-        at := At,
-        until := Until
-    } = Data
-) ->
+create(#{
+    who := Who,
+    by := By,
+    reason := Reason,
+    at := At,
+    until := Until
+}) ->
     Banned = #banned{
         who = Who,
         by = By,
@@ -192,16 +176,11 @@ create(
         at = At,
         until = Until
     },
-    create(Banned, Data);
-create(Banned = #banned{}) ->
-    create(Banned, #{clean => false}).
-
--spec create(emqx_types:banned(), banned_opts()) ->
-    {ok, emqx_types:banned()} | {error, {already_exist, emqx_types:banned()}}.
-create(Banned = #banned{who = Who}, Opts) ->
+    create(Banned);
+create(Banned = #banned{who = Who}) ->
     case look_up(Who) of
         [] ->
-            insert_banned(Banned, Opts),
+            mria:dirty_write(?BANNED_TAB, Banned),
             {ok, Banned};
         [OldBanned = #banned{until = Until}] ->
             %% Don't support shorten or extend the until time by overwrite.
@@ -211,7 +190,7 @@ create(Banned = #banned{who = Who}, Opts) ->
                     {error, {already_exist, OldBanned}};
                 %% overwrite expired one is ok.
                 false ->
-                    insert_banned(Banned, Opts),
+                    mria:dirty_write(?BANNED_TAB, Banned),
                     {ok, Banned}
             end
     end.
@@ -287,12 +266,3 @@ expire_banned_items(Now) ->
         ok,
         ?BANNED_TAB
     ).
-
-insert_banned(Banned, Opts) ->
-    mria:dirty_write(?BANNED_TAB, Banned),
-    run_hooks(Banned, Opts).
-
-run_hooks(Banned, #{clean := true}) ->
-    emqx_hooks:run('client.banned', [Banned]);
-run_hooks(_Banned, _Opts) ->
-    ok.

+ 2 - 2
apps/emqx/src/emqx_flapping.erl

@@ -121,7 +121,7 @@ handle_cast(
             started_at = StartedAt,
             detect_cnt = DetectCnt
         },
-        #{window_time := WindTime, ban_time := Interval, clean_when_banned := Clean}},
+        #{window_time := WindTime, ban_time := Interval}},
     State
 ) ->
     case now_diff(StartedAt) < WindTime of
@@ -145,7 +145,7 @@ handle_cast(
                 at = Now,
                 until = Now + (Interval div 1000)
             },
-            {ok, _} = emqx_banned:create(Banned, #{clean => Clean}),
+            {ok, _} = emqx_banned:create(Banned),
             ok;
         false ->
             ?SLOG(

+ 0 - 8
apps/emqx/src/emqx_schema.erl

@@ -640,14 +640,6 @@ fields("flapping_detect") ->
                     default => "5m",
                     desc => ?DESC(flapping_detect_ban_time)
                 }
-            )},
-        {"clean_when_banned",
-            sc(
-                boolean(),
-                #{
-                    default => false,
-                    desc => ?DESC(flapping_detect_clean_when_banned)
-                }
             )}
     ];
 fields("force_shutdown") ->

+ 1 - 2
apps/emqx/test/emqx_flapping_SUITE.erl

@@ -34,8 +34,7 @@ init_per_suite(Config) ->
             % 0.1s
             window_time => 100,
             %% 2s
-            ban_time => 2000,
-            clean_when_banned => false
+            ban_time => 2000
         }
     ),
     Config.

+ 0 - 12
apps/emqx_management/i18n/emqx_mgmt_api_banned_i18n.conf

@@ -95,16 +95,4 @@ emqx_mgmt_api_banned {
             zh: """封禁结束时间"""
         }
     }
-    clean {
-        desc {
-            en: """Clean retained/delayed messages when client is banned."""
-                """Note: This may be expensive and only supports users banned by clientid."""
-            zh: """当客户端被禁时删除其保留、延迟消息"""
-                """注意: 这个操作开销可能较大,且只支持通过 clientid 封禁的用户数据。"""
-        }
-        label {
-            en: """Clean when banned"""
-            zh: """被禁时清理消息"""
-        }
-    }
 }

+ 1 - 9
apps/emqx_management/src/emqx_mgmt_api_banned.erl

@@ -150,13 +150,6 @@ fields(ban) ->
                 desc => ?DESC(until),
                 required => false,
                 example => <<"2021-10-25T21:53:47+08:00">>
-            })},
-        {clean,
-            hoconsc:mk(boolean(), #{
-                desc => ?DESC(clean),
-                required => false,
-                default => false,
-                example => false
             })}
     ].
 
@@ -168,8 +161,7 @@ banned(post, #{body := Body}) ->
         {error, Reason} ->
             {400, 'BAD_REQUEST', list_to_binary(Reason)};
         Ban ->
-            Opts = emqx_banned:parse_opts(Body),
-            case emqx_banned:create(Ban, Opts) of
+            case emqx_banned:create(Ban) of
                 {ok, Banned} ->
                     {200, format(Banned)};
                 {error, {already_exist, Old}} ->

+ 5 - 25
apps/emqx_modules/src/emqx_delayed.erl

@@ -23,7 +23,6 @@
 -include_lib("emqx/include/logger.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 -include_lib("emqx/include/emqx_hooks.hrl").
--include_lib("stdlib/include/ms_transform.hrl").
 
 %% Mnesia bootstrap
 -export([mnesia/1]).
@@ -32,8 +31,7 @@
 
 -export([
     start_link/0,
-    on_message_publish/1,
-    on_client_banned/1
+    on_message_publish/1
 ]).
 
 %% gen_server callbacks
@@ -46,7 +44,7 @@
     code_change/3
 ]).
 
-%% API
+%% gen_server callbacks
 -export([
     load/0,
     unload/0,
@@ -59,9 +57,7 @@
     delete_delayed_message/1,
     delete_delayed_message/2,
     cluster_list/1,
-    cluster_query/4,
-    clean_by_clientid/1,
-    do_clean_by_clientid/1
+    cluster_query/4
 ]).
 
 -export([
@@ -142,11 +138,6 @@ on_message_publish(
 on_message_publish(Msg) ->
     {ok, Msg}.
 
-on_client_banned(#banned{who = {clientid, ClientId}}) ->
-    clean_by_clientid(ClientId);
-on_client_banned(_) ->
-    ok.
-
 %%--------------------------------------------------------------------
 %% Start delayed publish server
 %%--------------------------------------------------------------------
@@ -237,7 +228,7 @@ get_delayed_message(Id) ->
 get_delayed_message(Node, Id) when Node =:= node() ->
     get_delayed_message(Id);
 get_delayed_message(Node, Id) ->
-    emqx_delayed_proto_v2:get_delayed_message(Node, Id).
+    emqx_delayed_proto_v1:get_delayed_message(Node, Id).
 
 -spec delete_delayed_message(binary()) -> with_id_return().
 delete_delayed_message(Id) ->
@@ -252,7 +243,7 @@ delete_delayed_message(Id) ->
 delete_delayed_message(Node, Id) when Node =:= node() ->
     delete_delayed_message(Id);
 delete_delayed_message(Node, Id) ->
-    emqx_delayed_proto_v2:delete_delayed_message(Node, Id).
+    emqx_delayed_proto_v1:delete_delayed_message(Node, Id).
 
 update_config(Config) ->
     emqx_conf:update([delayed], Config, #{rawconf_with_defaults => true, override_to => cluster}).
@@ -261,15 +252,6 @@ post_config_update(_KeyPath, _ConfigReq, NewConf, _OldConf, _AppEnvs) ->
     Enable = maps:get(enable, NewConf, undefined),
     load_or_unload(Enable).
 
-clean_by_clientid(ClientId) ->
-    Nodes = mria_mnesia:running_nodes(),
-    emqx_delayed_proto_v2:clean_by_clientid(Nodes, ClientId).
-
-do_clean_by_clientid(ClientId) ->
-    ets:select_delete(
-        ?TAB, ets:fun2ms(fun(#delayed_message{msg = Msg}) -> Msg#message.from =:= ClientId end)
-    ).
-
 %%--------------------------------------------------------------------
 %% gen_server callback
 %%--------------------------------------------------------------------
@@ -401,11 +383,9 @@ delayed_count() -> mnesia:table_info(?TAB, size).
 
 do_load_or_unload(true, State) ->
     emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_DELAY_PUB),
-    ok = emqx_hooks:put('client.banned', {?MODULE, on_client_banned, []}, ?HP_LOWEST),
     State;
 do_load_or_unload(false, #{publish_timer := PubTimer} = State) ->
     emqx_hooks:del('message.publish', {?MODULE, on_message_publish}),
-    ok = emqx_hooks:del('client.banned', {?MODULE, on_client_banned}),
     emqx_misc:cancel_timer(PubTimer),
     ets:delete_all_objects(?TAB),
     State#{publish_timer := undefined, publish_at := 0};

+ 0 - 47
apps/emqx_modules/src/proto/emqx_delayed_proto_v2.erl

@@ -1,47 +0,0 @@
-%%--------------------------------------------------------------------
-%%Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
--module(emqx_delayed_proto_v2).
-
--behaviour(emqx_bpapi).
-
--export([
-    introduced_in/0,
-    get_delayed_message/2,
-    delete_delayed_message/2,
-    clean_by_clientid/2
-]).
-
--include_lib("emqx/include/bpapi.hrl").
-
--define(TIMEOUT, 15000).
-
-introduced_in() ->
-    "5.0.10".
-
--spec get_delayed_message(node(), binary()) ->
-    emqx_delayed:with_id_return(map()) | emqx_rpc:badrpc().
-get_delayed_message(Node, Id) ->
-    rpc:call(Node, emqx_delayed, get_delayed_message, [Id]).
-
--spec delete_delayed_message(node(), binary()) -> emqx_delayed:with_id_return() | emqx_rpc:badrpc().
-delete_delayed_message(Node, Id) ->
-    rpc:call(Node, emqx_delayed, delete_delayed_message, [Id]).
-
--spec clean_by_clientid(list(node()), emqx_types:clientid()) ->
-    emqx_rpc:erpc_multicall().
-clean_by_clientid(Nodes, ClientID) ->
-    erpc:multicall(Nodes, emqx_delayed, do_clean_by_clientid, [ClientID], ?TIMEOUT).

+ 0 - 61
apps/emqx_modules/test/emqx_delayed_SUITE.erl

@@ -212,67 +212,6 @@ t_delayed_precision(_) ->
     _ = on_message_publish(DelayedMsg0),
     ?assert(FutureDiff() =< MaxSpan).
 
-t_banned_clean(_) ->
-    emqx:update_config([delayed, max_delayed_messages], 10000),
-    ClientId1 = <<"bc1">>,
-    ClientId2 = <<"bc2">>,
-    {ok, C1} = emqtt:start_link([{clientid, ClientId1}, {clean_start, true}, {proto_ver, v5}]),
-    {ok, _} = emqtt:connect(C1),
-
-    {ok, C2} = emqtt:start_link([{clientid, ClientId2}, {clean_start, true}, {proto_ver, v5}]),
-    {ok, _} = emqtt:connect(C2),
-
-    [
-        begin
-            emqtt:publish(
-                Conn,
-                <<"$delayed/60/0/", ClientId/binary>>,
-                <<"">>,
-                [{qos, 0}, {retain, false}]
-            ),
-            emqtt:publish(
-                Conn,
-                <<"$delayed/60/1/", ClientId/binary>>,
-                <<"">>,
-                [{qos, 0}, {retain, false}]
-            )
-        end
-     || {ClientId, Conn} <- lists:zip([ClientId1, ClientId2], [C1, C2])
-    ],
-
-    emqtt:publish(
-        C2,
-        <<"$delayed/60/2/", ClientId2/binary>>,
-        <<"">>,
-        [{qos, 0}, {retain, false}]
-    ),
-
-    timer:sleep(500),
-    ?assertMatch(#{meta := #{count := 5}}, emqx_delayed:list(#{page => 1, limit => 10})),
-
-    Now = erlang:system_time(second),
-    Who = {clientid, ClientId2},
-    try
-        emqx_banned:create(#{
-            who => Who,
-            by => <<"test">>,
-            reason => <<"test">>,
-            at => Now,
-            until => Now + 120,
-            clean => true
-        }),
-
-        timer:sleep(500),
-
-        ?assertMatch(#{meta := #{count := 2}}, emqx_delayed:list(#{page => 1, limit => 10}))
-    after
-        emqx_banned:delete(Who),
-        emqx_delayed:clean_by_clientid(ClientId1)
-    end,
-    timer:sleep(500),
-    ok = emqtt:disconnect(C1),
-    ok = emqtt:disconnect(C2).
-
 subscribe_proc() ->
     Self = self(),
     Ref = erlang:make_ref(),

+ 1 - 23
apps/emqx_retainer/src/emqx_retainer.erl

@@ -19,7 +19,6 @@
 -behaviour(gen_server).
 
 -include("emqx_retainer.hrl").
--include_lib("emqx/include/emqx.hrl").
 -include_lib("emqx/include/logger.hrl").
 -include_lib("emqx/include/emqx_hooks.hrl").
 
@@ -27,8 +26,7 @@
 
 -export([
     on_session_subscribed/4,
-    on_message_publish/2,
-    on_client_banned/1
+    on_message_publish/2
 ]).
 
 -export([
@@ -41,7 +39,6 @@
     get_expiry_time/1,
     update_config/1,
     clean/0,
-    clean_by_clientid/1,
     delete/1,
     page_read/3,
     post_config_update/5,
@@ -83,7 +80,6 @@
 -callback match_messages(context(), topic(), cursor()) -> {ok, list(), cursor()}.
 -callback clear_expired(context()) -> ok.
 -callback clean(context()) -> ok.
--callback clean_by_clientid(context(), emqx_types:clientid()) -> ok.
 -callback size(context()) -> non_neg_integer().
 
 %%--------------------------------------------------------------------
@@ -122,11 +118,6 @@ on_message_publish(Msg = #message{flags = #{retain := true}}, Context) ->
 on_message_publish(Msg, _) ->
     {ok, Msg}.
 
-on_client_banned(#banned{who = {clientid, ClientId}}) ->
-    clean_by_clientid(ClientId);
-on_client_banned(_) ->
-    ok.
-
 %%--------------------------------------------------------------------
 %% APIs
 %%--------------------------------------------------------------------
@@ -160,9 +151,6 @@ update_config(Conf) ->
 clean() ->
     call(?FUNCTION_NAME).
 
-clean_by_clientid(ClientId) ->
-    call({?FUNCTION_NAME, ClientId}).
-
 delete(Topic) ->
     call({?FUNCTION_NAME, Topic}).
 
@@ -219,9 +207,6 @@ handle_call({update_config, NewConf, OldConf}, _, State) ->
 handle_call(clean, _, #{context := Context} = State) ->
     clean(Context),
     {reply, ok, State};
-handle_call({clean_by_clientid, ClientId}, _, #{context := Context} = State) ->
-    clean_by_clientid(Context, ClientId),
-    {reply, ok, State};
 handle_call({delete, Topic}, _, #{context := Context} = State) ->
     delete_message(Context, Topic),
     {reply, ok, State};
@@ -313,11 +298,6 @@ clean(Context) ->
     Mod = get_backend_module(),
     Mod:clean(Context).
 
--spec clean_by_clientid(context(), emqx_types:clientid()) -> ok.
-clean_by_clientid(Context, ClientId) ->
-    Mod = get_backend_module(),
-    Mod:clean_by_clientid(Context, ClientId).
-
 -spec update_config(state(), hocons:config(), hocons:config()) -> state().
 update_config(State, Conf, OldConf) ->
     update_config(
@@ -453,13 +433,11 @@ load(Context) ->
         'session.subscribed', {?MODULE, on_session_subscribed, [Context]}, ?HP_RETAINER
     ),
     ok = emqx_hooks:put('message.publish', {?MODULE, on_message_publish, [Context]}, ?HP_RETAINER),
-    ok = emqx_hooks:put('client.banned', {?MODULE, on_client_banned, []}, ?HP_LOWEST),
     emqx_stats:update_interval(emqx_retainer_stats, fun ?MODULE:stats_fun/0),
     ok.
 
 unload() ->
     ok = emqx_hooks:del('message.publish', {?MODULE, on_message_publish}),
     ok = emqx_hooks:del('session.subscribed', {?MODULE, on_session_subscribed}),
-    ok = emqx_hooks:del('client.banned', {?MODULE, on_client_banned}),
     emqx_stats:cancel_update(emqx_retainer_stats),
     ok.

+ 9 - 24
apps/emqx_retainer/src/emqx_retainer_mnesia.erl

@@ -33,14 +33,13 @@
     match_messages/3,
     clear_expired/1,
     clean/1,
-    clean_by_clientid/2,
     size/1
 ]).
 
 %% Internal exports (RPC)
 -export([
     do_store_retained/1,
-    do_clear/1,
+    do_clear_expired/0,
     do_delete_message/1,
     do_populate_index_meta/1,
     do_reindex_batch/2
@@ -62,8 +61,6 @@
 -record(retained_index, {key, expiry_time}).
 -record(retained_index_meta, {key, read_indices, write_indices, reindexing, extra}).
 
--type retained_message() :: #retained_message{}.
-
 -define(META_KEY, index_meta).
 
 -define(CLEAR_BATCH_SIZE, 1000).
@@ -167,22 +164,18 @@ do_store_retained(#message{topic = Topic} = Msg) ->
     end.
 
 clear_expired(_) ->
-    NowMs = erlang:system_time(millisecond),
-    {atomic, _} = mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_clear/1, [
-        fun(
-            #retained_message{expiry_time = ExpiryTime}
-        ) ->
-            (ExpiryTime =/= 0) and (ExpiryTime < NowMs)
-        end
-    ]),
+    {atomic, _} = mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_clear_expired/0),
     ok.
 
--spec do_clear(fun((retained_message()) -> boolean())) -> ok.
-do_clear(Pred) ->
+do_clear_expired() ->
+    NowMs = erlang:system_time(millisecond),
     QH = qlc:q([
         TopicTokens
-     || #retained_message{topic = TopicTokens} = Data <- mnesia:table(?TAB_MESSAGE, [{lock, write}]),
-        Pred(Data)
+     || #retained_message{
+            topic = TopicTokens,
+            expiry_time = ExpiryTime
+        } <- mnesia:table(?TAB_MESSAGE, [{lock, write}]),
+        (ExpiryTime =/= 0) and (ExpiryTime < NowMs)
     ]),
     QC = qlc:cursor(QH),
     clear_batch(db_indices(write), QC).
@@ -270,14 +263,6 @@ clean(_) ->
     _ = mria:clear_table(?TAB_INDEX),
     ok.
 
-clean_by_clientid(_, ClientId) ->
-    {atomic, _} = mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_clear/1, [
-        fun(Msg) ->
-            Msg#retained_message.msg#message.from =:= ClientId
-        end
-    ]),
-    ok.
-
 size(_) ->
     table_size().
 

+ 0 - 60
apps/emqx_retainer/test/emqx_retainer_SUITE.erl

@@ -626,66 +626,6 @@ t_get_basic_usage_info(_Config) ->
     ?assertEqual(#{retained_messages => 5}, emqx_retainer:get_basic_usage_info()),
     ok.
 
-t_banned_clean(_) ->
-    ClientId1 = <<"bc1">>,
-    ClientId2 = <<"bc2">>,
-    {ok, C1} = emqtt:start_link([{clientid, ClientId1}, {clean_start, true}, {proto_ver, v5}]),
-    {ok, _} = emqtt:connect(C1),
-
-    {ok, C2} = emqtt:start_link([{clientid, ClientId2}, {clean_start, true}, {proto_ver, v5}]),
-    {ok, _} = emqtt:connect(C2),
-
-    [
-        begin
-            emqtt:publish(
-                Conn,
-                <<"bc/0/", ClientId/binary>>,
-                <<"this is a retained message 0">>,
-                [{qos, 0}, {retain, true}]
-            ),
-            emqtt:publish(
-                Conn,
-                <<"bc/1/", ClientId/binary>>,
-                <<"this is a retained message 1">>,
-                [{qos, 0}, {retain, true}]
-            )
-        end
-     || {ClientId, Conn} <- lists:zip([ClientId1, ClientId2], [C1, C2])
-    ],
-
-    emqtt:publish(
-        C2,
-        <<"bc/2/", ClientId2/binary>>,
-        <<"this is a retained message 2">>,
-        [{qos, 0}, {retain, true}]
-    ),
-
-    timer:sleep(500),
-    {ok, List} = emqx_retainer:page_read(<<"bc/+/+">>, 1, 10),
-    ?assertEqual(5, length(List)),
-
-    Now = erlang:system_time(second),
-    Who = {clientid, ClientId2},
-    emqx_banned:create(#{
-        who => Who,
-        by => <<"test">>,
-        reason => <<"test">>,
-        at => Now,
-        until => Now + 120,
-        clean => true
-    }),
-
-    timer:sleep(500),
-
-    {ok, List2} = emqx_retainer:page_read(<<"bc/#">>, 1, 10),
-    ?assertEqual(2, length(List2)),
-
-    emqx_banned:delete(Who),
-    emqx_retainer:clean(),
-    timer:sleep(500),
-    ok = emqtt:disconnect(C1),
-    ok = emqtt:disconnect(C2).
-
 %% test whether the app can start normally after disabling emqx_retainer
 %% fix: https://github.com/emqx/emqx/pull/8911
 test_disable_then_start(_Config) ->