Jelajahi Sumber

Merge pull request #7128 from lafirest/refactor/retainer_flow_control

refactor(emqx_retainer): use hierarchical limiter for the flow control
JianBo He 3 tahun lalu
induk
melakukan
123b667eb0

+ 14 - 12
apps/emqx/src/emqx_limiter/etc/emqx_limiter.conf

@@ -3,11 +3,9 @@
 ##--------------------------------------------------------------------
 
 limiter {
+  ## rate limiter for message publish
   bytes_in {
-    global.rate = infinity         # token generation rate
-    zone.default.rate = infinity
     bucket.default {
-      zone = default
       aggregated.rate = infinity
       aggregated.capacity = infinity
       per_client.rate = infinity
@@ -15,11 +13,9 @@ limiter {
     }
   }
 
+  ## rate limiter for message publish
   message_in {
-    global.rate = infinity
-    zone.default.rate = infinity
     bucket.default {
-      zone = default
       aggregated.rate = infinity
       aggregated.capacity = infinity
       per_client.rate = infinity
@@ -27,11 +23,9 @@ limiter {
     }
   }
 
+  ## connection rate limiter
   connection {
-    global.rate = infinity
-    zone.default.rate = infinity
     bucket.default {
-      zone = default
       aggregated.rate = infinity
       aggregated.capacity = infinity
       per_client.rate = infinity
@@ -39,11 +33,19 @@ limiter {
     }
   }
 
+  ## rate limiter for message deliver
   message_routing {
-    global.rate = infinity
-    zone.default.rate = infinity
     bucket.default {
-      zone = default
+      aggregated.rate = infinity
+      aggregated.capacity = infinity
+      per_client.rate = infinity
+      per_client.capacity = infinity
+    }
+  }
+
+  ## Some functions that don't need to use global and zone scope, them can shared use this type
+  shared {
+    bucket.retainer {
       aggregated.rate = infinity
       aggregated.capacity = infinity
       per_client.rate = infinity

+ 17 - 7
apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl

@@ -27,7 +27,8 @@
 -type limiter_type() :: bytes_in
                       | message_in
                       | connection
-                      | message_routing.
+                      | message_routing
+                      | shared.
 
 -type bucket_name() :: atom().
 -type zone_name() :: atom().
@@ -66,22 +67,31 @@ fields(limiter) ->
     , {message_in, sc(ref(limiter_opts), #{})}
     , {connection, sc(ref(limiter_opts), #{})}
     , {message_routing, sc(ref(limiter_opts), #{})}
+    , {shared, sc(ref(shared_limiter_opts),
+                  #{description =>
+                        <<"Some functions that do not need to use global and zone scope,"
+                          "them can shared use this type">>})}
     ];
 
 fields(limiter_opts) ->
-    [ {global, sc(ref(rate_burst), #{})}
-    , {zone, sc(map("zone name", ref(rate_burst)), #{})}
+    [ {global, sc(ref(rate_burst), #{nuallabe => true})}
+    , {zone, sc(map("zone name", ref(rate_burst)), #{nullable => true})}
     , {bucket, sc(map("bucket_id", ref(bucket)),
                   #{desc => "Token bucket"})}
     ];
 
+fields(shared_limiter_opts) ->
+    [{bucket, sc(map("bucket_id", ref(bucket)),
+                 #{desc => "Token bucket"})}
+    ];
+
 fields(rate_burst) ->
     [ {rate, sc(rate(), #{})}
     , {burst, sc(burst_rate(), #{default => "0/0s"})}
     ];
 
 fields(bucket) ->
-    [ {zone, sc(atom(), #{desc => "The bucket's zone"})}
+    [ {zone, sc(atom(), #{desc => "The bucket's zone", default => default})}
     , {aggregated, sc(ref(bucket_aggregated), #{})}
     , {per_client, sc(ref(client_bucket), #{})}
     ];
@@ -119,14 +129,14 @@ the check/consume will succeed, but it will be forced to wait for a short period
 minimum_period() ->
     100.
 
+to_rate(Str) ->
+    to_rate(Str, true, false).
+
 %%--------------------------------------------------------------------
 %% Internal functions
 %%--------------------------------------------------------------------
 ref(Field) -> hoconsc:ref(?MODULE, Field).
 
-to_rate(Str) ->
-    to_rate(Str, true, false).
-
 to_burst_rate(Str) ->
     to_rate(Str, false, true).
 

+ 17 - 8
apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl

@@ -378,11 +378,11 @@ maybe_burst(#{buckets := Buckets,
                          index := Index,
                          zone := Zone} = maps:get(Id, Nodes),
                        case counters:get(Counter, Index) of
-                             Any when Any =< 0 ->
-                                 Group = maps:get(Zone, Groups, []),
-                                 maps:put(Zone, [Id | Group], Groups);
-                             _ ->
-                                 Groups
+                           Any when Any =< 0 ->
+                               Group = maps:get(Zone, Groups, []),
+                               maps:put(Zone, [Id | Group], Groups);
+                           _ ->
+                               Groups
                        end
                end,
 
@@ -451,9 +451,15 @@ dispatch_burst_to_buckets([], _, Alloced, Nodes) ->
 
 -spec init_tree(emqx_limiter_schema:limiter_type(), state()) -> state().
 init_tree(Type, State) ->
-    #{global := Global,
-      zone := Zone,
-      bucket := Bucket} = emqx:get_config([limiter, Type]),
+    case emqx:get_config([limiter, Type]) of
+        #{global := Global,
+          zone := Zone,
+          bucket := Bucket} -> ok;
+        #{bucket := Bucket} ->
+            Global = default_rate_burst_cfg(),
+            Zone = #{default => default_rate_burst_cfg()},
+            ok
+    end,
     {Factor, Root} = make_root(Global, Zone),
     State2 = State#{root := Root},
     {NodeId, State3} = make_zone(maps:to_list(Zone), Factor, 1, State2),
@@ -592,3 +598,6 @@ get_initial_val(#{initial := Initial,
        true ->
             0
     end.
+
+default_rate_burst_cfg() ->
+    #{rate => infinity, burst => 0}.

+ 10 - 1
apps/emqx/test/emqx_ratelimiter_SUITE.erl

@@ -59,7 +59,7 @@ limiter {
       aggregated.capacity = infinity
       per_client.rate = infinity
       per_client.capacity = infinity
-                   }
+    }
   }
 
   message_routing {
@@ -73,6 +73,15 @@ limiter {
       per_client.capacity = infinity
     }
   }
+
+  shared {
+    bucket.retainer {
+      aggregated.rate = infinity
+      aggregated.capacity = infinity
+      per_client.rate = infinity
+      per_client.capacity = infinity
+    }
+  }
 }
 
 """>>).

+ 11 - 9
apps/emqx_retainer/etc/emqx_retainer.conf

@@ -40,25 +40,27 @@ retainer {
   ## When a client subscribe to a wildcard topic, may many retained messages will be loaded.
   ## If you don't want these data loaded to the memory all at once, you can use this to control.
   ## The processing flow:
-  ##   load max_read_number retained message from storage ->
+  ##   load batch_read_number retained message from storage ->
   ##    deliver ->
   ##    repeat this, until all retianed messages are delivered
   ##
   flow_control {
-    ## The max messages number per read from storage. 0 means no limit
+    ## The messages batch number per read from storage. 0 means no limit
     ##
     ## Default: 0
-    max_read_number = 0
+    batch_read_number = 0
 
-    ## The max number of retained message can be delivered in emqx per quota_release_interval.0 means no limit
+    ## The number of retained message can be delivered per batch
+    ## Range: [0, 1000]
+    ## Note: If this value is too large, it may cause difficulty in applying for the token of deliver
     ##
     ## Default: 0
-    msg_deliver_quota = 0
+    batch_deliver_number = 0
 
-    ## deliver quota reset interval
+    ## deliver limiter bucket
     ##
     ## Default: 0s
-    quota_release_interval = 0s
+    limiter_bucket_name = retainer
   }
 
   ## Maximum retained message size.
@@ -66,11 +68,11 @@ retainer {
   ## Value: Bytes
   max_payload_size = 1MB
 
-  ## Storage connect parameters
+  ## Storage backend parameters
   ##
   ## Value: built_in_database
   ##
-  config {
+  backend {
 
     type = built_in_database
 

+ 22 - 146
apps/emqx_retainer/src/emqx_retainer.erl

@@ -27,10 +27,10 @@
         , on_message_publish/2
         ]).
 
--export([ dispatch/4
-        , delete_message/2
+-export([ delete_message/2
         , store_retained/2
-        , deliver/5]).
+        , get_backend_module/0
+        ]).
 
 -export([ get_expiry_time/1
         , update_config/1
@@ -54,8 +54,6 @@
                   , context_id := non_neg_integer()
                   , context := undefined | context()
                   , clear_timer := undefined | reference()
-                  , release_quota_timer := undefined | reference()
-                  , wait_quotas := list()
                   }.
 
 -define(DEF_MAX_PAYLOAD_SIZE, (1024 * 1024)).
@@ -116,45 +114,6 @@ on_message_publish(Msg, _) ->
 start_link() ->
     gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
 
--spec dispatch(context(), pid(), topic(), cursor()) -> ok.
-dispatch(Context, Pid, Topic, Cursor) ->
-    Mod = get_backend_module(),
-    case Cursor =/= undefined orelse emqx_topic:wildcard(Topic) of
-        false ->
-            {ok, Result} = Mod:read_message(Context, Topic),
-            deliver(Result, Context, Pid, Topic, undefined);
-        true  ->
-            {ok, Result, NewCursor} =  Mod:match_messages(Context, Topic, Cursor),
-            deliver(Result, Context, Pid, Topic, NewCursor)
-    end.
-
-deliver([], Context, Pid, Topic, Cursor) ->
-    case Cursor of
-        undefined ->
-            ok;
-        _ ->
-            dispatch(Context, Pid, Topic, Cursor)
-    end;
-deliver(Result, #{context_id := Id} = Context, Pid, Topic, Cursor) ->
-    case erlang:is_process_alive(Pid) of
-        false ->
-            ok;
-        _ ->
-            #{msg_deliver_quota := MaxDeliverNum} = emqx:get_config([retainer, flow_control]),
-            case MaxDeliverNum of
-                0 ->
-                    _ = [Pid ! {deliver, Topic, Msg} || Msg <- Result],
-                    ok;
-                _ ->
-                    case do_deliver(Result, Id, Pid, Topic) of
-                        ok ->
-                            deliver([], Context, Pid, Topic, Cursor);
-                        abort ->
-                            ok
-                    end
-            end
-    end.
-
 get_expiry_time(#message{headers = #{properties := #{'Message-Expiry-Interval' := 0}}}) ->
     0;
 get_expiry_time(#message{headers = #{properties := #{'Message-Expiry-Interval' := Interval}},
@@ -198,7 +157,6 @@ stats_fun() ->
 
 init([]) ->
     emqx_conf:add_handler([retainer], ?MODULE),
-    init_shared_context(),
     State = new_state(),
     #{enable := Enable} = Cfg = emqx:get_config([retainer]),
     {ok,
@@ -213,9 +171,6 @@ handle_call({update_config, NewConf, OldConf}, _, State) ->
     State2 = update_config(State, NewConf, OldConf),
     {reply, ok, State2};
 
-handle_call({wait_semaphore, Id}, From, #{wait_quotas := Waits} = State) ->
-    {noreply, State#{wait_quotas := [{Id, From} | Waits]}};
-
 handle_call(clean, _, #{context := Context} = State) ->
     clean(Context),
     {reply, ok, State};
@@ -249,30 +204,12 @@ handle_info(clear_expired, #{context := Context} = State) ->
     Interval = emqx_conf:get([retainer, msg_clear_interval], ?DEF_EXPIRY_INTERVAL),
     {noreply, State#{clear_timer := add_timer(Interval, clear_expired)}, hibernate};
 
-handle_info(release_deliver_quota, #{context := Context, wait_quotas := Waits} = State) ->
-    insert_shared_context(?DELIVER_SEMAPHORE, get_msg_deliver_quota()),
-    case Waits of
-        [] ->
-            ok;
-        _ ->
-            #{context_id := NowId} = Context,
-            Waits2 = lists:reverse(Waits),
-            lists:foreach(fun({Id, From}) ->
-                                  gen_server:reply(From, Id =:= NowId)
-                          end,
-                          Waits2)
-    end,
-    Interval = emqx:get_config([retainer, flow_control, quota_release_interval]),
-    {noreply, State#{release_quota_timer := add_timer(Interval, release_deliver_quota),
-                     wait_quotas := []}};
-
 handle_info(Info, State) ->
     ?SLOG(error, #{msg => "unexpected_info", info => Info}),
     {noreply, State}.
 
-terminate(_Reason, #{clear_timer := TRef1, release_quota_timer := TRef2}) ->
-    _ = stop_timer(TRef1),
-    _ = stop_timer(TRef2),
+terminate(_Reason, #{clear_timer := ClearTimer}) ->
+    _ = stop_timer(ClearTimer),
     ok.
 
 code_change(_OldVsn, State, _Extra) ->
@@ -286,22 +223,19 @@ new_state() ->
     #{enable => false,
       context_id => 0,
       context => undefined,
-      clear_timer => undefined,
-      release_quota_timer => undefined,
-      wait_quotas => []}.
+      clear_timer => undefined
+     }.
 
 -spec new_context(pos_integer()) -> context().
 new_context(Id) ->
     #{context_id => Id}.
 
-
 payload_size_limit() ->
     emqx_conf:get(?MAX_PAYLOAD_SIZE_CONFIG_PATH, ?DEF_MAX_PAYLOAD_SIZE).
 
 %% @private
 dispatch(Context, Topic) ->
-    emqx_retainer_pool:async_submit(fun ?MODULE:dispatch/4,
-                                    [Context, self(), Topic, undefined]).
+    emqx_retainer_dispatcher:dispatch(Context, Topic).
 
 -spec delete_message(context(), topic()) -> ok.
 delete_message(Context, Topic) ->
@@ -328,53 +262,6 @@ clean(Context) ->
     Mod = get_backend_module(),
     Mod:clean(Context).
 
--spec do_deliver(list(term()), pos_integer(), pid(), topic()) -> ok | abort.
-do_deliver([Msg | T], Id, Pid, Topic) ->
-    case require_semaphore(?DELIVER_SEMAPHORE, Id) of
-        true ->
-            Pid ! {deliver, Topic, Msg},
-            do_deliver(T, Id, Pid, Topic);
-        _ ->
-            abort
-    end;
-do_deliver([], _, _, _) ->
-    ok.
-
--spec require_semaphore(semaphore(), pos_integer()) -> boolean().
-require_semaphore(Semaphore, Id) ->
-    Remained = ets:update_counter(?SHARED_CONTEXT_TAB,
-                                  Semaphore,
-                                  {#shared_context.value, -1, -1, -1}),
-    wait_semaphore(Remained, Id).
-
--spec wait_semaphore(non_neg_integer(), pos_integer()) -> boolean().
-wait_semaphore(X, Id) when X < 0 ->
-    call({?FUNCTION_NAME, Id});
-wait_semaphore(_, _) ->
-    true.
-
--spec init_shared_context() -> ok.
-init_shared_context() ->
-    ?SHARED_CONTEXT_TAB = ets:new(?SHARED_CONTEXT_TAB,
-                                  [ set, named_table, public
-                                  , {keypos, #shared_context.key}
-                                  , {write_concurrency, true}
-                                  , {read_concurrency, true}]),
-    lists:foreach(fun({K, V}) ->
-                          insert_shared_context(K, V)
-                  end,
-                  [{?DELIVER_SEMAPHORE, get_msg_deliver_quota()}]).
-
-
--spec insert_shared_context(shared_context_key(), term()) -> ok.
-insert_shared_context(Key, Term) ->
-    ets:insert(?SHARED_CONTEXT_TAB, #shared_context{key = Key, value = Term}),
-    ok.
-
--spec get_msg_deliver_quota() -> non_neg_integer().
-get_msg_deliver_quota() ->
-    emqx:get_config([retainer, flow_control, msg_deliver_quota]).
-
 -spec update_config(state(), hocons:config(), hocons:config()) -> state().
 update_config(State, Conf, OldConf) ->
     update_config(maps:get(enable, Conf),
@@ -391,24 +278,19 @@ update_config(true, false, State, NewConf, _) ->
     enable_retainer(State, NewConf);
 
 update_config(true, true,
-              #{clear_timer := ClearTimer,
-                release_quota_timer := QuotaTimer} = State, NewConf, OldConf) ->
-    #{config := Cfg,
-      flow_control := #{quota_release_interval := QuotaInterval},
+              #{clear_timer := ClearTimer} = State, NewConf, OldConf) ->
+    #{backend := BackendCfg,
       msg_clear_interval := ClearInterval} = NewConf,
 
-    #{config := OldCfg} = OldConf,
+    #{backend := OldBackendCfg} = OldConf,
 
-    StorageType = maps:get(type, Cfg),
-    OldStrorageType = maps:get(type, OldCfg),
+    StorageType = maps:get(type, BackendCfg),
+    OldStrorageType = maps:get(type, OldBackendCfg),
     case OldStrorageType of
         StorageType ->
             State#{clear_timer := check_timer(ClearTimer,
                                               ClearInterval,
-                                              clear_expired),
-                   release_quota_timer := check_timer(QuotaTimer,
-                                                      QuotaInterval,
-                                                      release_deliver_quota)};
+                                              clear_expired)};
         _ ->
             State2 = disable_retainer(State),
             enable_retainer(State2, NewConf)
@@ -417,29 +299,23 @@ update_config(true, true,
 -spec enable_retainer(state(), hocon:config()) -> state().
 enable_retainer(#{context_id := ContextId} = State,
                 #{msg_clear_interval := ClearInterval,
-                  flow_control := #{quota_release_interval := ReleaseInterval},
-                  config := Config}) ->
+                  backend := BackendCfg}) ->
     NewContextId = ContextId + 1,
-    Context = create_resource(new_context(NewContextId), Config),
+    Context = create_resource(new_context(NewContextId), BackendCfg),
     load(Context),
     State#{enable := true,
            context_id := NewContextId,
            context := Context,
-           clear_timer := add_timer(ClearInterval, clear_expired),
-           release_quota_timer := add_timer(ReleaseInterval, release_deliver_quota)}.
+           clear_timer := add_timer(ClearInterval, clear_expired)}.
 
 -spec disable_retainer(state()) -> state().
-disable_retainer(#{clear_timer := TRef1,
-                   release_quota_timer := TRef2,
-                   context := Context,
-                   wait_quotas := Waits} = State) ->
+disable_retainer(#{clear_timer := ClearTimer,
+                   context := Context} = State) ->
     unload(),
-    ok = lists:foreach(fun(E) -> gen_server:reply(E, false) end, Waits),
     ok = close_resource(Context),
     State#{enable := false,
-           clear_timer := stop_timer(TRef1),
-           release_quota_timer := stop_timer(TRef2),
-           wait_quotas := []}.
+           clear_timer := stop_timer(ClearTimer)
+          }.
 
 -spec stop_timer(undefined | reference()) -> undefined.
 stop_timer(undefined) ->
@@ -466,7 +342,7 @@ check_timer(Timer, _, _) ->
 
 -spec get_backend_module() -> backend().
 get_backend_module() ->
-    ModName = case emqx:get_config([retainer, config]) of
+    ModName = case emqx:get_config([retainer, backend]) of
                   #{type := built_in_database} -> mnesia;
                   #{type := Backend} -> Backend
               end,

+ 104 - 18
apps/emqx_retainer/src/emqx_retainer_pool.erl

@@ -14,27 +14,41 @@
 %% limitations under the License.
 %%--------------------------------------------------------------------
 
--module(emqx_retainer_pool).
+-module(emqx_retainer_dispatcher).
 
 -behaviour(gen_server).
 
+-include("emqx_retainer.hrl").
 -include_lib("emqx/include/logger.hrl").
 
 %% API
--export([start_link/2,
-         async_submit/2]).
+-export([ start_link/2
+        , dispatch/2
+        , refresh_limiter/0
+        ]).
 
 %% gen_server callbacks
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
          terminate/2, code_change/3, format_status/2]).
 
+-type limiter() :: emqx_htb_limiter:limiter().
+
 -define(POOL, ?MODULE).
 
 %%%===================================================================
 %%% API
 %%%===================================================================
-async_submit(Fun, Args) ->
-    cast({async_submit, {Fun, Args}}).
+dispatch(Context, Topic) ->
+    cast({?FUNCTION_NAME, Context, self(), Topic}).
+
+%% sometimes it is necessary to reset the client's limiter after updated the limiter's config
+%% an limiter update handler maybe added later, now this is a workaround
+refresh_limiter() ->
+    Workers = gproc_pool:active_workers(?POOL),
+    lists:foreach(fun({_, Pid}) ->
+                          gen_server:cast(Pid, ?FUNCTION_NAME)
+                  end,
+                  Workers).
 
 %%--------------------------------------------------------------------
 %% @doc
@@ -66,7 +80,9 @@ start_link(Pool, Id) ->
           ignore.
 init([Pool, Id]) ->
     true = gproc_pool:connect_worker(Pool, {Pool, Id}),
-    {ok, #{pool => Pool, id => Id}}.
+    Bucket = emqx:get_config([retainer, flow_control, limiter_bucket_name]),
+    Limiter = emqx_limiter_server:connect(shared, Bucket),
+    {ok, #{pool => Pool, id => Id, limiter => Limiter}}.
 
 %%--------------------------------------------------------------------
 %% @private
@@ -98,12 +114,14 @@ handle_call(Req, _From, State) ->
           {noreply, NewState :: term(), Timeout :: timeout()} |
           {noreply, NewState :: term(), hibernate} |
           {stop, Reason :: term(), NewState :: term()}.
-handle_cast({async_submit, Task}, State) ->
-    try run(Task)
-    catch _:Error:Stacktrace ->
-            ?SLOG(error, #{msg => "crashed_handling_async_task", exception => Error, stacktrace => Stacktrace})
-    end,
-    {noreply, State};
+handle_cast({dispatch, Context, Pid, Topic}, #{limiter := Limiter} = State) ->
+    {ok, Limiter2} = dispatch(Context, Pid, Topic, undefined, Limiter),
+    {noreply, State#{limiter := Limiter2}};
+
+handle_cast(refresh_limiter, State) ->
+    Bucket = emqx:get_config([retainer, flow_control, limiter_bucket_name]),
+    Limiter = emqx_limiter_server:connect(shared, Bucket),
+    {noreply, State#{limiter := Limiter}};
 
 handle_cast(Msg, State) ->
     ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
@@ -174,9 +192,77 @@ cast(Msg) ->
 worker() ->
     gproc_pool:pick_worker(?POOL, self()).
 
-run({M, F, A}) ->
-    erlang:apply(M, F, A);
-run({F, A}) when is_function(F), is_list(A) ->
-    erlang:apply(F, A);
-run(Fun) when is_function(Fun) ->
-    Fun().
+-spec dispatch(context(), pid(), topic(), cursor(), limiter()) -> {ok, limiter()}.
+dispatch(Context, Pid, Topic, Cursor, Limiter) ->
+    Mod = emqx_retainer:get_backend_module(),
+    case Cursor =/= undefined orelse emqx_topic:wildcard(Topic) of
+        false ->
+            {ok, Result} = Mod:read_message(Context, Topic),
+            deliver(Result, Context, Pid, Topic, undefined, Limiter);
+        true  ->
+            {ok, Result, NewCursor} =  Mod:match_messages(Context, Topic, Cursor),
+            deliver(Result, Context, Pid, Topic, NewCursor, Limiter)
+    end.
+
+-spec deliver(list(emqx_types:message()), context(), pid(), topic(), cursor(), limiter()) -> {ok, limiter()}.
+deliver([], _Context, _Pid, _Topic, undefined, Limiter) ->
+    {ok, Limiter};
+
+deliver([], Context, Pid, Topic, Cursor, Limiter) ->
+    dispatch(Context, Pid, Topic, Cursor, Limiter);
+
+deliver(Result, Context, Pid, Topic, Cursor, Limiter) ->
+    case erlang:is_process_alive(Pid) of
+        false ->
+            {ok, Limiter};
+        _ ->
+            DeliverNum = emqx:get_config([retainer, flow_control, batch_deliver_number]),
+            case DeliverNum of
+                0 ->
+                    do_deliver(Result, Pid, Topic),
+                    {ok, Limiter};
+                _ ->
+                    case do_deliver(Result, DeliverNum, Pid, Topic, Limiter) of
+                        {ok, Limiter2} ->
+                            deliver([], Context, Pid, Topic, Cursor, Limiter2);
+                        {drop, Limiter2} ->
+                            {ok, Limiter2}
+                    end
+            end
+    end.
+
+do_deliver([], _DeliverNum, _Pid, _Topic, Limiter) ->
+    {ok, Limiter};
+
+do_deliver(Msgs, DeliverNum, Pid, Topic, Limiter) ->
+    {Num, ToDelivers, Msgs2} = safe_split(DeliverNum, Msgs),
+    case emqx_htb_limiter:consume(Num, Limiter) of
+        {ok, Limiter2} ->
+            do_deliver(ToDelivers, Pid, Topic),
+            do_deliver(Msgs2, DeliverNum, Pid, Topic, Limiter2);
+        {drop, _} = Drop ->
+            ?SLOG(error, #{msg => "retained_message_dropped",
+                           reason => "reached_ratelimit",
+                           dropped_count => length(ToDelivers)
+                          }),
+            Drop
+    end.
+
+do_deliver([Msg | T], Pid, Topic) ->
+    Pid ! {deliver, Topic, Msg},
+    do_deliver(T, Pid, Topic);
+
+do_deliver([], _, _) ->
+    ok.
+
+safe_split(N, List) ->
+    safe_split(N, List, 0, []).
+
+safe_split(0, List, Count, Acc) ->
+    {Count, lists:reverse(Acc), List};
+
+safe_split(_N, [], Count, Acc) ->
+    {Count, lists:reverse(Acc), []};
+
+safe_split(N, [H | T], Count, Acc) ->
+    safe_split(N - 1, T, Count + 1, [H | Acc]).

+ 5 - 5
apps/emqx_retainer/src/emqx_retainer_mnesia.erl

@@ -144,17 +144,17 @@ page_read(_, Topic, Page, Limit) ->
     {ok, Rows}.
 
 match_messages(_, Topic, Cursor) ->
-    MaxReadNum = emqx:get_config([retainer, flow_control, max_read_number]),
+    BatchReadNum = emqx:get_config([retainer, flow_control, batch_read_number]),
     case Cursor of
         undefined ->
-            case MaxReadNum of
+            case BatchReadNum of
                 0 ->
                     {ok, sort_retained(match_messages(Topic)), undefined};
                 _ ->
-                    start_batch_read(Topic, MaxReadNum)
+                    start_batch_read(Topic, BatchReadNum)
             end;
         _ ->
-            batch_read_messages(Cursor, MaxReadNum)
+            batch_read_messages(Cursor, BatchReadNum)
     end.
 
 clean(_) ->
@@ -253,7 +253,7 @@ make_cursor(Topic) ->
 
 -spec is_table_full() -> boolean().
 is_table_full() ->
-    #{max_retained_messages := Limit} = emqx:get_config([retainer, config]),
+    Limit = emqx:get_config([retainer, backend, max_retained_messages]),
     Limit > 0 andalso (table_size() >= Limit).
 
 -spec table_size() -> non_neg_integer().

+ 5 - 5
apps/emqx_retainer/src/emqx_retainer_schema.erl

@@ -17,7 +17,7 @@ fields("retainer") ->
     , {flow_control, ?TYPE(hoconsc:ref(?MODULE, flow_control))}
     , {max_payload_size, sc(emqx_schema:bytesize(), "1MB")}
     , {stop_publish_clear_msg, sc(boolean(), false)}
-    , {config, config()}
+    , {backend, backend_config()}
     ];
 
 fields(mnesia_config) ->
@@ -27,9 +27,9 @@ fields(mnesia_config) ->
     ];
 
 fields(flow_control) ->
-    [ {max_read_number, sc(integer(), 0, fun is_pos_integer/1)}
-    , {msg_deliver_quota, sc(integer(), 0, fun is_pos_integer/1)}
-    , {quota_release_interval, sc(emqx_schema:duration_ms(), "0ms")}
+    [ {batch_read_number, sc(integer(), 0, fun is_pos_integer/1)}
+    , {batch_deliver_number, sc(range(0, 1000), 0)}
+    , {limiter_bucket_name, sc(atom(), retainer)}
     ].
 
 %%--------------------------------------------------------------------
@@ -45,5 +45,5 @@ sc(Type, Default, Validator) ->
 is_pos_integer(V) ->
     V >= 0.
 
-config() ->
+backend_config() ->
     #{type => hoconsc:union([hoconsc:ref(?MODULE, mnesia_config)])}.

+ 2 - 2
apps/emqx_retainer/src/emqx_retainer_sup.erl

@@ -26,8 +26,8 @@ start_link() ->
     supervisor:start_link({local, ?MODULE}, ?MODULE, []).
 
 init([]) ->
-    PoolSpec = emqx_pool_sup:spec([emqx_retainer_pool, hash, emqx_vm:schedulers(),
-                                   {emqx_retainer_pool, start_link, []}]),
+    PoolSpec = emqx_pool_sup:spec([emqx_retainer_dispatcher, hash, emqx_vm:schedulers(),
+                                   {emqx_retainer_dispatcher, start_link, []}]),
     {ok, {{one_for_one, 10, 3600},
           [#{id       => retainer,
              start    => {emqx_retainer, start_link, []},

+ 37 - 15
apps/emqx_retainer/test/emqx_retainer_SUITE.erl

@@ -34,16 +34,16 @@ retainer {
     msg_expiry_interval = 0s
     max_payload_size = 1MB
     flow_control {
-        max_read_number = 0
-        msg_deliver_quota = 0
-        quota_release_interval = 0s
-    }
-    config {
+        batch_read_number = 0
+        batch_deliver_number = 0
+        limiter_bucket_name = retainer
+     }
+   backend {
         type = built_in_database
         storage_type = ram
         max_retained_messages = 0
-        }
-  }""">>).
+     }
+}""">>).
 
 %%--------------------------------------------------------------------
 %% Setups
@@ -57,7 +57,8 @@ init_per_suite(Config) ->
     meck:expect(emqx_alarm, activate, 3, ok),
     meck:expect(emqx_alarm, deactivate, 3, ok),
 
-    ok = emqx_common_test_helpers:load_config(emqx_retainer_schema, ?BASE_CONF),
+    load_base_conf(),
+    emqx_ratelimiter_SUITE:base_conf(),
     emqx_common_test_helpers:start_apps([emqx_retainer]),
     Config.
 
@@ -83,6 +84,9 @@ end_per_testcase(_, Config) ->
     end,
     Config.
 
+load_base_conf() ->
+    ok = emqx_common_test_helpers:load_config(emqx_retainer_schema, ?BASE_CONF).
+
 %%--------------------------------------------------------------------
 %% Test Cases
 %%--------------------------------------------------------------------
@@ -282,10 +286,20 @@ t_stop_publish_clear_msg(_) ->
     ok = emqtt:disconnect(C1).
 
 t_flow_control(_) ->
+    #{per_client := PerClient} = RetainerCfg = emqx_config:get([limiter, shared, bucket, retainer]),
+    RetainerCfg2 = RetainerCfg#{per_client := PerClient#{rate := emqx_ratelimiter_SUITE:to_rate("1/1s"),
+                                                         capacity := 1}},
+    emqx_config:put([limiter, shared, bucket, retainer], RetainerCfg2),
+    emqx_limiter_manager:restart_server(shared),
+    timer:sleep(500),
+
+    emqx_retainer_dispatcher:refresh_limiter(),
+    timer:sleep(500),
+
     emqx_retainer:update_config(#{<<"flow_control">> =>
-                                  #{<<"max_read_number">> => 1,
-                                    <<"msg_deliver_quota">> => 1,
-                                    <<"quota_release_interval">> => <<"1s">>}}),
+                                      #{<<"batch_read_number">> => 1,
+                                        <<"batch_deliver_number">> => 1,
+                                        <<"limiter_bucket_name">> => retainer}}),
     {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
     {ok, _} = emqtt:connect(C1),
     emqtt:publish(
@@ -309,11 +323,19 @@ t_flow_control(_) ->
     End = erlang:system_time(millisecond),
     Diff = End - Begin,
 
-    %% msg_deliver_quota = 1 and quota_release_interval = 1, and there has three message
-    %% so total wait time is between in 1 ~ 2s(may be timer will delay, so plus 0.5s to maximum)
-    ?assert(Diff > timer:seconds(1) andalso Diff < timer:seconds(2.5)),
+    ?assert(Diff > timer:seconds(2.5) andalso Diff < timer:seconds(3.9),
+            lists:flatten(io_lib:format("Diff is :~p~n", [Diff]))),
 
-    ok = emqtt:disconnect(C1).
+    ok = emqtt:disconnect(C1),
+
+    %% recover the limiter
+    emqx_config:put([limiter, shared, bucket, retainer], RetainerCfg),
+    emqx_limiter_manager:restart_server(shared),
+    timer:sleep(500),
+
+    emqx_retainer_dispatcher:refresh_limiter(),
+    timer:sleep(500),
+    ok.
 
 %%--------------------------------------------------------------------
 %% Helper functions

+ 1 - 19
apps/emqx_retainer/test/emqx_retainer_mqtt_v5_SUITE.erl

@@ -21,28 +21,10 @@
 
 -include_lib("eunit/include/eunit.hrl").
 
--define(BASE_CONF, <<"""
-retainer {
-    enable = true
-    msg_clear_interval = 0s
-    msg_expiry_interval = 0s
-    max_payload_size = 1MB
-    flow_control {
-        max_read_number = 0
-        msg_deliver_quota = 0
-        quota_release_interval = 0s
-    }
-    config {
-        type = built_in_database
-        storage_type = ram
-        max_retained_messages = 0
-        }
-  }""">>).
-
 all() -> emqx_common_test_helpers:all(?MODULE).
 
 init_per_suite(Config) ->
-    ok = emqx_common_test_helpers:load_config(emqx_retainer_schema, ?BASE_CONF),
+    emqx_retainer_SUITE:load_base_conf(),
     %% Meck emqtt
     ok = meck:new(emqtt, [non_strict, passthrough, no_history, no_link]),
     %% Start Apps