Sfoglia il codice sorgente

Merge pull request #8211 from lafirest/fix/limiter_infinity_update

fix(limiter): set maximum value for `infinity` rate and capacity
JianBo He 3 anni fa
parent
commit
89ff67a1e6

+ 0 - 5
apps/emqx/etc/emqx.conf

@@ -1,13 +1,11 @@
 listeners.tcp.default {
   bind = "0.0.0.0:1883"
   max_connections = 1024000
-  limiter.connection = default
 }
 
 listeners.ssl.default {
   bind = "0.0.0.0:8883"
   max_connections = 512000
-  limiter.connection = default
   ssl_options {
     keyfile = "{{ platform_etc_dir }}/certs/key.pem"
     certfile = "{{ platform_etc_dir }}/certs/cert.pem"
@@ -18,14 +16,12 @@ listeners.ssl.default {
 listeners.ws.default {
   bind = "0.0.0.0:8083"
   max_connections = 1024000
-  limiter.connection = default
   websocket.mqtt_path = "/mqtt"
 }
 
 listeners.wss.default {
   bind = "0.0.0.0:8084"
   max_connections = 512000
-  limiter.connection = default
   websocket.mqtt_path = "/mqtt"
   ssl_options {
     keyfile = "{{ platform_etc_dir }}/certs/key.pem"
@@ -38,7 +34,6 @@ listeners.wss.default {
 #  enabled = false
 #  bind = "0.0.0.0:14567"
 #  max_connections = 1024000
-#  limiter.connection = default
 #  keyfile = "{{ platform_etc_dir }}/certs/key.pem"
 #  certfile = "{{ platform_etc_dir }}/certs/cert.pem"
 #}

+ 3 - 4
apps/emqx/src/emqx_connection.erl

@@ -956,7 +956,7 @@ check_limiter(
                 {ok, Limiter2} ->
                     WhenOk(Data, Msgs, State#state{limiter = Limiter2});
                 {pause, Time, Limiter2} ->
-                    ?SLOG(warning, #{
+                    ?SLOG(debug, #{
                         msg => "pause_time_dueto_rate_limit",
                         needs => Needs,
                         time_in_ms => Time
@@ -982,8 +982,7 @@ check_limiter(
         _ ->
             %% if there has a retry timer,
             %% cache the operation and execute it after the retry is over
-            %% TODO: maybe we need to set socket to passive if size of queue is very large
-            %% because we queue up lots of ops that checks with the limiters.
+            %% the maximum length of the cache queue is equal to the active_n
             New = #cache{need = Needs, data = Data, next = WhenOk},
             {ok, State#state{limiter_cache = queue:in(New, Cache)}}
     end;
@@ -1006,7 +1005,7 @@ retry_limiter(#state{limiter = Limiter} = State) ->
                 }
             );
         {pause, Time, Limiter2} ->
-            ?SLOG(warning, #{
+            ?SLOG(debug, #{
                 msg => "pause_time_dueto_rate_limit",
                 types => Types,
                 time_in_ms => Time

+ 0 - 11
apps/emqx/src/emqx_limiter/etc/emqx_limiter.conf

@@ -1,11 +0,0 @@
-limiter {
-  connection {
-    rate = "1000/s"
-    bucket {
-      default {
-        rate = "1000/s"
-        capacity = 1000
-      }
-    }
-  }
-}

+ 1 - 7
apps/emqx/src/emqx_limiter/src/emqx_limiter_bucket_ref.erl

@@ -50,13 +50,7 @@
 %%--------------------------------------------------------------------
 %%  API
 %%--------------------------------------------------------------------
--spec new(
-    undefined | counters:countres_ref(),
-    undefined | index(),
-    rate()
-) -> bucket_ref().
-new(undefined, _, _) ->
-    infinity;
+-spec new(counters:countres_ref(), index(), rate()) -> bucket_ref().
 new(Counter, Index, Rate) ->
     #{
         counter => Counter,

+ 31 - 4
apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl

@@ -30,7 +30,8 @@
     namespace/0,
     get_bucket_cfg_path/2,
     desc/1,
-    types/0
+    types/0,
+    infinity_value/0
 ]).
 
 -define(KILOBYTE, 1024).
@@ -46,7 +47,7 @@
 -type rate() :: infinity | float().
 -type burst_rate() :: 0 | float().
 %% the capacity of the token bucket
--type capacity() :: infinity | number().
+-type capacity() :: non_neg_integer().
 %% initial capacity of the token bucket
 -type initial() :: non_neg_integer().
 -type bucket_path() :: list(atom()).
@@ -88,7 +89,7 @@ fields(limiter) ->
         {Type,
             ?HOCON(?R_REF(limiter_opts), #{
                 desc => ?DESC(Type),
-                default => #{}
+                default => make_limiter_default(Type)
             })}
      || Type <- types()
     ];
@@ -207,6 +208,18 @@ types() ->
 %% Internal functions
 %%--------------------------------------------------------------------
 
+%% `infinity` to `infinity_value` rules:
+%% 1. all infinity capacity will change to infinity_value
+%% 2. if the rate of global and bucket  both are `infinity`,
+%%    use `infinity_value` as bucket rate. see `emqx_limiter_server:get_counter_rate/2`
+infinity_value() ->
+    %% 1 TB
+    1099511627776.
+
+%%--------------------------------------------------------------------
+%% Internal functions
+%%--------------------------------------------------------------------
+
 to_burst_rate(Str) ->
     to_rate(Str, false, true).
 
@@ -294,7 +307,7 @@ to_quota(Str, Regex) ->
             {match, [Quota, ""]} ->
                 {ok, erlang:list_to_integer(Quota)};
             {match, ""} ->
-                {ok, infinity};
+                {ok, infinity_value()};
             _ ->
                 {error, Str}
         end
@@ -308,3 +321,17 @@ apply_unit("kb", Val) -> Val * ?KILOBYTE;
 apply_unit("mb", Val) -> Val * ?KILOBYTE * ?KILOBYTE;
 apply_unit("gb", Val) -> Val * ?KILOBYTE * ?KILOBYTE * ?KILOBYTE;
 apply_unit(Unit, _) -> throw("invalid unit:" ++ Unit).
+
+make_limiter_default(connection) ->
+    #{
+        <<"rate">> => <<"1000/s">>,
+        <<"bucket">> => #{
+            <<"default">> =>
+                #{
+                    <<"rate">> => <<"1000/s">>,
+                    <<"capacity">> => 1000
+                }
+        }
+    };
+make_limiter_default(_) ->
+    #{}.

+ 28 - 36
apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl

@@ -112,24 +112,26 @@
 %% If no bucket path is set in config, there will be no limit
 connect(_Type, undefined) ->
     {ok, emqx_htb_limiter:make_infinity_limiter()};
+%% Workaround.
+%% After API updated some config, the bucket name maybe become ‘’ (converted from empty binary)
+connect(_Type, '') ->
+    {ok, emqx_htb_limiter:make_infinity_limiter()};
 connect(Type, BucketName) when is_atom(BucketName) ->
     case get_bucket_cfg(Type, BucketName) of
         undefined ->
             ?SLOG(error, #{msg => "bucket_config_not_found", type => Type, bucket => BucketName}),
             {error, config_not_found};
         #{
-            rate := AggrRate,
-            capacity := AggrSize,
+            rate := BucketRate,
+            capacity := BucketSize,
             per_client := #{rate := CliRate, capacity := CliSize} = Cfg
         } ->
             case emqx_limiter_manager:find_bucket(Type, BucketName) of
                 {ok, Bucket} ->
                     {ok,
                         if
-                            CliRate < AggrRate orelse CliSize < AggrSize ->
+                            CliRate < BucketRate orelse CliSize < BucketSize ->
                                 emqx_htb_limiter:make_token_bucket_limiter(Cfg, Bucket);
-                            Bucket =:= infinity ->
-                                emqx_htb_limiter:make_infinity_limiter();
                             true ->
                                 emqx_htb_limiter:make_ref_limiter(Cfg, Bucket)
                         end};
@@ -372,9 +374,6 @@ longitudinal(
 
     case lists:min([ShouldAlloc, Flow, Capacity]) of
         Available when Available > 0 ->
-            %% XXX if capacity is infinity, and flow always > 0, the value in
-            %% counter will be overflow at some point in the future, do we need
-            %% to deal with this situation???
             {Inc, Bucket2} = emqx_limiter_correction:add(Available, Bucket),
             counters:add(Counter, Index, Inc),
 
@@ -491,26 +490,14 @@ make_root(#{rate := Rate, burst := Burst}) ->
 
 make_bucket([{Name, Conf} | T], Type, GlobalCfg, CounterNum, DelayBuckets) ->
     Path = emqx_limiter_manager:make_path(Type, Name),
-    case get_counter_rate(Conf, GlobalCfg) of
-        infinity ->
-            Rate = infinity,
-            Capacity = infinity,
-            Initial = 0,
-            Ref = emqx_limiter_bucket_ref:new(undefined, undefined, Rate),
-            emqx_limiter_manager:insert_bucket(Path, Ref),
-            CounterNum2 = CounterNum,
-            InitFun = fun(#{name := BucketName} = Bucket, #{buckets := Buckets} = State) ->
-                State#{buckets := Buckets#{BucketName => Bucket}}
-            end;
-        Rate ->
-            #{capacity := Capacity} = Conf,
-            Initial = get_initial_val(Conf),
-            CounterNum2 = CounterNum + 1,
-            InitFun = fun(#{name := BucketName} = Bucket, #{buckets := Buckets} = State) ->
-                {Counter, Idx, State2} = alloc_counter(Path, Rate, Initial, State),
-                Bucket2 = Bucket#{counter := Counter, index := Idx},
-                State2#{buckets := Buckets#{BucketName => Bucket2}}
-            end
+    Rate = get_counter_rate(Conf, GlobalCfg),
+    #{capacity := Capacity} = Conf,
+    Initial = get_initial_val(Conf),
+    CounterNum2 = CounterNum + 1,
+    InitFun = fun(#{name := BucketName} = Bucket, #{buckets := Buckets} = State) ->
+        {Counter, Idx, State2} = alloc_counter(Path, Rate, Initial, State),
+        Bucket2 = Bucket#{counter := Counter, index := Idx},
+        State2#{buckets := Buckets#{BucketName => Bucket2}}
     end,
 
     Bucket = #{
@@ -569,22 +556,27 @@ init_counter(Path, Counter, Index, Rate, Initial, State) ->
 %% @doc find first limited node
 get_counter_rate(#{rate := Rate}, _GlobalCfg) when Rate =/= infinity ->
     Rate;
-get_counter_rate(_Cfg, #{rate := Rate}) ->
-    Rate.
+get_counter_rate(_Cfg, #{rate := Rate}) when Rate =/= infinity ->
+    Rate;
+get_counter_rate(_Cfg, _GlobalCfg) ->
+    emqx_limiter_schema:infinity_value().
 
 -spec get_initial_val(hocons:config()) -> decimal().
-get_initial_val(#{
-    initial := Initial,
-    rate := Rate,
-    capacity := Capacity
-}) ->
+get_initial_val(
+    #{
+        initial := Initial,
+        rate := Rate,
+        capacity := Capacity
+    }
+) ->
     %% initial will nevner be infinity(see the emqx_limiter_schema)
+    InfVal = emqx_limiter_schema:infinity_value(),
     if
         Initial > 0 ->
             Initial;
         Rate =/= infinity ->
             erlang:min(Rate, Capacity);
-        Capacity =/= infinity ->
+        Capacity =/= infinity andalso Capacity =/= InfVal ->
             Capacity;
         true ->
             0

+ 1 - 1
apps/emqx/src/emqx_schema.erl

@@ -1614,7 +1614,7 @@ base_listener(Bind) ->
                 map("ratelimit_name", emqx_limiter_schema:bucket_name()),
                 #{
                     desc => ?DESC(base_listener_limiter),
-                    default => #{}
+                    default => #{<<"connection">> => <<"default">>}
                 }
             )}
     ].

+ 2 - 2
apps/emqx/src/emqx_ws_connection.erl

@@ -586,7 +586,7 @@ check_limiter(
                 {ok, Limiter2} ->
                     WhenOk(Data, Msgs, State#state{limiter = Limiter2});
                 {pause, Time, Limiter2} ->
-                    ?SLOG(warning, #{
+                    ?SLOG(debug, #{
                         msg => "pause_time_due_to_rate_limit",
                         needs => Needs,
                         time_in_ms => Time
@@ -634,7 +634,7 @@ retry_limiter(#state{limiter = Limiter} = State) ->
                 }
             );
         {pause, Time, Limiter2} ->
-            ?SLOG(warning, #{
+            ?SLOG(debug, #{
                 msg => "pause_time_due_to_rate_limit",
                 types => Types,
                 time_in_ms => Time

+ 4 - 0
apps/emqx/test/emqx_channel_SUITE.erl

@@ -468,6 +468,8 @@ t_handle_in_qos1_publish(_) ->
 t_handle_in_qos2_publish(_) ->
     ok = meck:expect(emqx_broker, publish, fun(_) -> [{node(), <<"topic">>, {ok, 1}}] end),
     Channel = channel(#{conn_state => connected, session => session()}),
+    %% waiting limiter server
+    timer:sleep(200),
     Publish1 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 1, <<"payload">>),
     {ok, ?PUBREC_PACKET(1, ?RC_SUCCESS), Channel1} =
         emqx_channel:handle_in(Publish1, Channel),
@@ -482,6 +484,8 @@ t_handle_in_qos2_publish_with_error_return(_) ->
     ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
     Session = session(#{max_awaiting_rel => 2, awaiting_rel => #{1 => 1}}),
     Channel = channel(#{conn_state => connected, session => Session}),
+    %% waiting limiter server
+    timer:sleep(200),
     Publish1 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 1, <<"payload">>),
     {ok, ?PUBREC_PACKET(1, ?RC_PACKET_IDENTIFIER_IN_USE), Channel} =
         emqx_channel:handle_in(Publish1, Channel),

+ 3 - 2
apps/emqx/test/emqx_ratelimiter_SUITE.erl

@@ -246,7 +246,8 @@ t_infinity_client(_) ->
     end,
     Case = fun() ->
         Client = connect(default),
-        ?assertEqual(infinity, Client),
+        InfVal = emqx_limiter_schema:infinity_value(),
+        ?assertMatch(#{bucket := #{rate := InfVal}}, Client),
         Result = emqx_htb_limiter:check(100000, Client),
         ?assertEqual({ok, Client}, Result)
     end,
@@ -596,7 +597,7 @@ t_schema_unit(_) ->
     ?assertMatch({error, _}, M:to_rate("100MB/1")),
     ?assertMatch({error, _}, M:to_rate("100/10x")),
 
-    ?assertEqual({ok, infinity}, M:to_capacity("infinity")),
+    ?assertEqual({ok, emqx_limiter_schema:infinity_value()}, M:to_capacity("infinity")),
     ?assertEqual({ok, 100}, M:to_capacity("100")),
     ?assertEqual({ok, 100 * 1024}, M:to_capacity("100KB")),
     ?assertEqual({ok, 100 * 1024 * 1024}, M:to_capacity("100MB")),

+ 1 - 0
apps/emqx_retainer/src/emqx_retainer.erl

@@ -201,6 +201,7 @@ init([]) ->
 
 handle_call({update_config, NewConf, OldConf}, _, State) ->
     State2 = update_config(State, NewConf, OldConf),
+    emqx_retainer_dispatcher:refresh_limiter(NewConf),
     {reply, ok, State2};
 handle_call(clean, _, #{context := Context} = State) ->
     clean(Context),

+ 10 - 6
apps/emqx_retainer/src/emqx_retainer_dispatcher.erl

@@ -26,6 +26,7 @@
     start_link/2,
     dispatch/2,
     refresh_limiter/0,
+    refresh_limiter/1,
     wait_dispatch_complete/1,
     worker/0
 ]).
@@ -51,13 +52,16 @@
 dispatch(Context, Topic) ->
     cast({?FUNCTION_NAME, Context, self(), Topic}).
 
-%% sometimes it is necessary to reset the client's limiter after updated the limiter's config
-%% an limiter update handler maybe added later, now this is a workaround
+%% reset the client's limiter after updated the limiter's config
 refresh_limiter() ->
+    Conf = emqx:get_config([retainer]),
+    refresh_limiter(Conf).
+
+refresh_limiter(Conf) ->
     Workers = gproc_pool:active_workers(?POOL),
     lists:foreach(
         fun({_, Pid}) ->
-            gen_server:cast(Pid, ?FUNCTION_NAME)
+            gen_server:cast(Pid, {?FUNCTION_NAME, Conf})
         end,
         Workers
     ).
@@ -150,8 +154,8 @@ handle_call(Req, _From, State) ->
 handle_cast({dispatch, Context, Pid, Topic}, #{limiter := Limiter} = State) ->
     {ok, Limiter2} = dispatch(Context, Pid, Topic, undefined, Limiter),
     {noreply, State#{limiter := Limiter2}};
-handle_cast(refresh_limiter, State) ->
-    BucketName = emqx_conf:get([retainer, flow_control, batch_deliver_limiter]),
+handle_cast({refresh_limiter, Conf}, State) ->
+    BucketName = emqx_map_lib:deep_get([flow_control, batch_deliver_limiter], Conf, undefined),
     {ok, Limiter} = emqx_limiter_server:connect(batch, BucketName),
     {noreply, State#{limiter := Limiter}};
 handle_cast(Msg, State) ->
@@ -273,7 +277,7 @@ do_deliver(Msgs, DeliverNum, Pid, Topic, Limiter) ->
             do_deliver(ToDelivers, Pid, Topic),
             do_deliver(Msgs2, DeliverNum, Pid, Topic, Limiter2);
         {drop, _} = Drop ->
-            ?SLOG(error, #{
+            ?SLOG(debug, #{
                 msg => "retained_message_dropped",
                 reason => "reached_ratelimit",
                 dropped_count => length(ToDelivers)