Browse Source

fix(retainer): fix/add some comment

lafirest 3 years ago
parent
commit
9bdebabdbc

+ 5 - 0
apps/emqx/src/emqx_limiter/etc/emqx_limiter.conf

@@ -3,6 +3,7 @@
 ##--------------------------------------------------------------------
 ##--------------------------------------------------------------------
 
 
 limiter {
 limiter {
+  ## rate limiter for message publish
   bytes_in {
   bytes_in {
     bucket.default {
     bucket.default {
       aggregated.rate = infinity
       aggregated.rate = infinity
@@ -12,6 +13,7 @@ limiter {
     }
     }
   }
   }
 
 
+  ## rate limiter for message publish
   message_in {
   message_in {
     bucket.default {
     bucket.default {
       aggregated.rate = infinity
       aggregated.rate = infinity
@@ -21,6 +23,7 @@ limiter {
     }
     }
   }
   }
 
 
+  ## connection rate limiter
   connection {
   connection {
     bucket.default {
     bucket.default {
       aggregated.rate = infinity
       aggregated.rate = infinity
@@ -30,6 +33,7 @@ limiter {
     }
     }
   }
   }
 
 
+  ## rate limiter for message deliver
   message_routing {
   message_routing {
     bucket.default {
     bucket.default {
       aggregated.rate = infinity
       aggregated.rate = infinity
@@ -39,6 +43,7 @@ limiter {
     }
     }
   }
   }
 
 
+  ## Some functions that don't need to use global and zone scope, them can shared use this type
   shared {
   shared {
     bucket.retainer {
     bucket.retainer {
       aggregated.rate = infinity
       aggregated.rate = infinity

+ 3 - 1
apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl

@@ -68,7 +68,9 @@ fields(limiter) ->
     , {connection, sc(ref(limiter_opts), #{})}
     , {connection, sc(ref(limiter_opts), #{})}
     , {message_routing, sc(ref(limiter_opts), #{})}
     , {message_routing, sc(ref(limiter_opts), #{})}
     , {shared, sc(ref(shared_limiter_opts),
     , {shared, sc(ref(shared_limiter_opts),
-                  #{description => <<"Some functions that do not need to use global and zone scope, them can shared use this type">>})}
+                  #{description =>
+                        <<"Some functions that do not need to use global and zone scope,"
+                          "them can shared use this type">>})}
     ];
     ];
 
 
 fields(limiter_opts) ->
 fields(limiter_opts) ->

+ 4 - 2
apps/emqx_retainer/etc/emqx_retainer.conf

@@ -40,17 +40,19 @@ retainer {
   ## When a client subscribe to a wildcard topic, may many retained messages will be loaded.
   ## When a client subscribe to a wildcard topic, may many retained messages will be loaded.
   ## If you don't want these data loaded to the memory all at once, you can use this to control.
   ## If you don't want these data loaded to the memory all at once, you can use this to control.
   ## The processing flow:
   ## The processing flow:
-  ##   load max_read_number retained message from storage ->
+  ##   load batch_read_number retained message from storage ->
   ##    deliver ->
   ##    deliver ->
   ##    repeat this, until all retianed messages are delivered
   ##    repeat this, until all retianed messages are delivered
   ##
   ##
   flow_control {
   flow_control {
-    ## The messages number per read from storage. 0 means no limit
+    ## The messages batch number per read from storage. 0 means no limit
     ##
     ##
     ## Default: 0
     ## Default: 0
     batch_read_number = 0
     batch_read_number = 0
 
 
     ## The number of retained message can be delivered per batch
     ## The number of retained message can be delivered per batch
+    ## Range: [0, 1000]
+    ## Note: If this value is too large, it may cause difficulty in applying for the token of deliver
     ##
     ##
     ## Default: 0
     ## Default: 0
     batch_deliver_number = 0
     batch_deliver_number = 0

+ 5 - 2
apps/emqx_retainer/src/emqx_retainer_dispatcher.erl

@@ -22,7 +22,7 @@
 -include_lib("emqx/include/logger.hrl").
 -include_lib("emqx/include/logger.hrl").
 
 
 %% API
 %% API
--export([start_link/2
+-export([ start_link/2
         , dispatch/2
         , dispatch/2
         , refresh_limiter/0
         , refresh_limiter/0
         ]).
         ]).
@@ -241,7 +241,10 @@ do_deliver(Msgs, DeliverNum, Pid, Topic, Limiter) ->
             do_deliver(ToDelivers, Pid, Topic),
             do_deliver(ToDelivers, Pid, Topic),
             do_deliver(Msgs2, DeliverNum, Pid, Topic, Limiter2);
             do_deliver(Msgs2, DeliverNum, Pid, Topic, Limiter2);
         {drop, _} = Drop ->
         {drop, _} = Drop ->
-            ?SLOG(error, #{msg => "the retainer deliver failed because the required quota could not be obtained"}),
+            ?SLOG(error, #{msg => "retained_message_dropped",
+                           reason => "reached_ratelimit",
+                           dropped_count => length(ToDelivers)
+                          }),
             Drop
             Drop
     end.
     end.
 
 

+ 1 - 1
apps/emqx_retainer/src/emqx_retainer_schema.erl

@@ -28,7 +28,7 @@ fields(mnesia_config) ->
 
 
 fields(flow_control) ->
 fields(flow_control) ->
     [ {batch_read_number, sc(integer(), 0, fun is_pos_integer/1)}
     [ {batch_read_number, sc(integer(), 0, fun is_pos_integer/1)}
-    , {batch_deliver_number, sc(range(0, 50), 0)}
+    , {batch_deliver_number, sc(range(0, 1000), 0)}
     , {limiter_bucket_name, sc(atom(), retainer)}
     , {limiter_bucket_name, sc(atom(), retainer)}
     ].
     ].
 
 

+ 2 - 2
apps/emqx_retainer/test/emqx_retainer_SUITE.erl

@@ -57,7 +57,7 @@ init_per_suite(Config) ->
     meck:expect(emqx_alarm, activate, 3, ok),
     meck:expect(emqx_alarm, activate, 3, ok),
     meck:expect(emqx_alarm, deactivate, 3, ok),
     meck:expect(emqx_alarm, deactivate, 3, ok),
 
 
-    base_conf(),
+    load_base_conf(),
     emqx_ratelimiter_SUITE:base_conf(),
     emqx_ratelimiter_SUITE:base_conf(),
     emqx_common_test_helpers:start_apps([emqx_retainer]),
     emqx_common_test_helpers:start_apps([emqx_retainer]),
     Config.
     Config.
@@ -84,7 +84,7 @@ end_per_testcase(_, Config) ->
     end,
     end,
     Config.
     Config.
 
 
-base_conf() ->
+load_base_conf() ->
     ok = emqx_common_test_helpers:load_config(emqx_retainer_schema, ?BASE_CONF).
     ok = emqx_common_test_helpers:load_config(emqx_retainer_schema, ?BASE_CONF).
 
 
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------

+ 1 - 1
apps/emqx_retainer/test/emqx_retainer_mqtt_v5_SUITE.erl

@@ -24,7 +24,7 @@
 all() -> emqx_common_test_helpers:all(?MODULE).
 all() -> emqx_common_test_helpers:all(?MODULE).
 
 
 init_per_suite(Config) ->
 init_per_suite(Config) ->
-    emqx_retainer_SUITE:base_conf(),
+    emqx_retainer_SUITE:load_base_conf(),
     %% Meck emqtt
     %% Meck emqtt
     ok = meck:new(emqtt, [non_strict, passthrough, no_history, no_link]),
     ok = meck:new(emqtt, [non_strict, passthrough, no_history, no_link]),
     %% Start Apps
     %% Start Apps