Browse Source

feat(retainer): add a field `deliver_rate` to limit the maximum delivery rate

firest 2 years ago
parent
commit
03c7b84e89

+ 33 - 2
apps/emqx_retainer/src/emqx_retainer_schema.erl

@@ -32,7 +32,13 @@
 
 namespace() -> "retainer".
 
-roots() -> ["retainer"].
+roots() ->
+    [
+        {"retainer",
+            hoconsc:mk(hoconsc:ref(?MODULE, "retainer"), #{
+                converter => fun retainer_converter/2
+            })}
+    ].
 
 fields("retainer") ->
     [
@@ -68,6 +74,11 @@ fields("retainer") ->
                 stop_publish_clear_msg,
                 false
             )},
+        {deliver_rate,
+            ?HOCON(
+                emqx_limiter_schema:rate(),
+                #{required => false, desc => ?DESC(deliver_rate), example => <<"1000/s">>}
+            )},
         {backend, backend_config()}
     ];
 fields(mnesia_config) ->
@@ -97,7 +108,7 @@ fields(flow_control) ->
             )},
         {batch_deliver_number,
             sc(
-                range(0, 1000),
+                non_neg_integer(),
                 batch_deliver_number,
                 0
             )},
@@ -173,3 +184,23 @@ check_duplicate(List) ->
         false -> ?INVALID_SPEC(unique_index_spec_limited);
         true -> ok
     end.
+
+retainer_converter(#{<<"deliver_rate">> := <<"infinity">>} = Conf, _Opts) ->
+    Conf#{
+        <<"flow_control">> => #{
+            <<"batch_read_number">> => 0,
+            <<"batch_deliver_number">> => 0
+        }
+    };
+retainer_converter(#{<<"deliver_rate">> := RateStr} = Conf, _Opts) ->
+    {ok, RateNum} = emqx_limiter_schema:to_rate(RateStr),
+    RawRate = erlang:floor(RateNum * 1000 / emqx_limiter_schema:default_period()),
+    Control = #{
+        <<"batch_read_number">> => RawRate,
+        <<"batch_deliver_number">> => RawRate,
+        %% Set the maximum delivery rate per session
+        <<"batch_deliver_limiter">> => #{<<"client">> => #{<<"rate">> => RateStr}}
+    },
+    Conf#{<<"flow_control">> => Control};
+retainer_converter(Conf, _Opts) ->
+    Conf.

+ 55 - 0
apps/emqx_retainer/test/emqx_retainer_SUITE.erl

@@ -706,6 +706,61 @@ t_deliver_when_banned(_) ->
     {ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained/+">>),
     ok = emqtt:disconnect(C1).
 
+t_compatibility_for_deliver_rate(_) ->
+    Parser = fun(Conf) ->
+        {ok, RawConf} = hocon:binary(Conf, #{format => map}),
+        hocon_tconf:check_plain(emqx_retainer_schema, RawConf, #{
+            required => false, atom_key => false
+        })
+    end,
+    Infinity = <<"retainer.deliver_rate = \"infinity\"">>,
+    ?assertMatch(
+        #{
+            <<"retainer">> :=
+                #{
+                    <<"flow_control">> := #{
+                        <<"batch_deliver_number">> := 0,
+                        <<"batch_read_number">> := 0,
+                        <<"batch_deliver_limiter">> := #{<<"rate">> := infinity}
+                    }
+                }
+        },
+        Parser(Infinity)
+    ),
+
+    R1 = <<"retainer.deliver_rate = \"1000/s\"">>,
+    ?assertMatch(
+        #{
+            <<"retainer">> :=
+                #{
+                    <<"flow_control">> := #{
+                        <<"batch_deliver_number">> := 1000,
+                        <<"batch_read_number">> := 1000,
+                        <<"batch_deliver_limiter">> := #{<<"client">> := #{<<"rate">> := 100.0}}
+                    }
+                }
+        },
+        Parser(R1)
+    ),
+
+    R2 = <<
+        "retainer{deliver_rate = \"1000/s\"",
+        "flow_control.batch_deliver_limiter.rate = \"500/s\"}"
+    >>,
+    ?assertMatch(
+        #{
+            <<"retainer">> :=
+                #{
+                    <<"flow_control">> := #{
+                        <<"batch_deliver_number">> := 1000,
+                        <<"batch_read_number">> := 1000,
+                        <<"batch_deliver_limiter">> := #{<<"client">> := #{<<"rate">> := 100.0}}
+                    }
+                }
+        },
+        Parser(R2)
+    ).
+
 %%--------------------------------------------------------------------
 %% Helper functions
 %%--------------------------------------------------------------------

+ 2 - 0
rel/i18n/emqx_retainer_schema.hocon

@@ -46,4 +46,6 @@ whether to continue to publish the message.
 See:
 http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718038"""
 
+deliver_rate.desc:
+"""The maximum rate of delivering retain messages"""
 }