Przeglądaj źródła

feat(emqx_limiter): use hierarchical limiter for esockd

lafirest 4 lat temu
rodzic
commit
be2c36cfa0

+ 0 - 25
apps/emqx/etc/emqx.conf

@@ -960,29 +960,6 @@ rate_limit {
   conn_bytes_in = "100KB,10s"
 }
 
-quota {
-  ## Messages quota for the each of external MQTT connection.
-  ## This value consumed by the number of recipient on a message.
-  ##
-  ## @doc quota.conn_messages_routing
-  ## ValueType: String | infinity
-  ## Default: infinity
-  ## Examples: 100 messaegs per 1s:
-  ##   quota.conn_messages_routing: "100,1s"
-  conn_messages_routing = "100,1s"
-
-  ## Messages quota for the all of external MQTT connections.
-  ## This value consumed by the number of recipient on a message.
-  ##
-  ## @doc quota.overall_messages_routing
-  ## ValueType: String | infinity
-  ## Default: infinity
-  ## Examples: 200000 messages per 1s:
-  ##    quota.overall_messages_routing: "200000,1s"
-  ##
-  overall_messages_routing = "200000,1s"
-}
-
 ##==================================================================
 ## Zones
 ##==================================================================
@@ -1020,8 +997,6 @@ quota {
 ##   - `flapping_detect.*`
 ##   - `force_shutdown.*`
 ##   - `conn_congestion.*`
-##   - `rate_limit.*`
-##   - `quota.*`
 ##   - `force_gc.*`
 ##
 ## syntax: zones.<zone-name>

+ 1 - 1
apps/emqx/rebar.config

@@ -16,7 +16,7 @@
     , {typerefl, {git, "https://github.com/k32/typerefl", {tag, "0.8.6"}}}
     , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
     , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}}
-    , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.0"}}}
+    , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.1"}}}
     , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.12.1"}}}
     , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.0"}}}
     , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.24.0"}}}

+ 58 - 0
apps/emqx/src/emqx_limiter/src/emqx_htb_generic.erl

@@ -0,0 +1,58 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_htb_generic).
+
+-behaviour(esockd_generic_limiter).
+
+%% API
+-export([new_create_options/2, create/1, delete/1, consume/2]).
+
+-type create_options() :: #{ module := emqx_htb_generic
+                           , type := emqx_limiter_schema:limiter_type()
+                           , bucket := emqx_limiter_schema:bucket_name()
+                           }.
+
+%%--------------------------------------------------------------------
+%%  API
+%%--------------------------------------------------------------------
+
+-spec new_create_options(emqx_limiter_schema:limiter_type(),
+                         emqx_limiter_schema:bucket_name()) -> create_options().
+new_create_options(Type, BucketName) ->
+    #{module => ?MODULE, type => Type, bucket => BucketName}.
+
+-spec create(create_options()) -> esockd_generic_limiter:limiter().
+create(#{module := ?MODULE, type := Type, bucket := BucketName}) ->
+    Limiter = emqx_limiter_server:connect(Type, BucketName),
+    #{module => ?MODULE, name => Type, limiter => Limiter}.
+
+delete(_GLimiter) ->
+    ok.
+
+consume(Token, #{limiter := Limiter} = GLimiter) ->
+    case emqx_htb_limiter:check(Token, Limiter) of
+        {ok, Limiter2} ->
+            {ok, GLimiter#{limiter := Limiter2}};
+        {pause, Ms, Retry, Limiter2} ->
+            {pause, Ms, GLimiter#{limiter := emqx_htb_limiter:set_retry(Retry, Limiter2)}};
+        {drop, Limiter2} ->
+            {ok, GLimiter#{limiter := Limiter2}}
+    end.
+
+%%--------------------------------------------------------------------
+%%  Internal functions
+%%--------------------------------------------------------------------

+ 7 - 4
apps/emqx/src/emqx_listeners.erl

@@ -306,10 +306,13 @@ do_flatten_listeners(Type, Conf0) ->
 
 esockd_opts(Type, Opts0) ->
     Opts1 = maps:with([acceptors, max_connections, proxy_protocol, proxy_protocol_timeout], Opts0),
-    Opts2 = case emqx_config:get_zone_conf(zone(Opts0), [rate_limit, max_conn_rate]) of
-        infinity -> Opts1;
-        Rate -> Opts1#{max_conn_rate => Rate}
-    end,
+    Limiter = limiter(Opts0),
+    Opts2 = case maps:get(connection, Limiter, undefined) of
+                undefined ->
+                    Opts1;
+                BucketName ->
+                    Opts1#{limiter => emqx_htb_generic:new_create_options(connection, BucketName)}
+            end,
     Opts3 = Opts2#{ access_rules => esockd_access_rules(maps:get(access_rules, Opts0, []))
                   , tune_fun => {emqx_olp, backoff_new_conn, [zone(Opts0)]}
                   },

+ 1 - 17
apps/emqx/src/emqx_schema.erl

@@ -163,11 +163,8 @@ this number of messages or bytes have passed through."""
    , {"conn_congestion",
        sc(ref("conn_congestion"),
           #{})}
-   , {"quota",
-       sc(ref("quota"),
-          #{})}
    , {"stats",
-       sc(ref("stats"),
+      sc(ref("stats"),
           #{})}
    , {"sysmon",
        sc(ref("sysmon"),
@@ -494,19 +491,6 @@ fields("rate_limit") ->
        }
     ];
 
-fields("quota") ->
-    [ {"conn_messages_routing",
-       sc(hoconsc:union([infinity, comma_separated_list()]),
-          #{ default => infinity
-           })
-       }
-    , {"overall_messages_routing",
-       sc(hoconsc:union([infinity, comma_separated_list()]),
-          #{ default => infinity
-           })
-      }
-    ];
-
 fields("flapping_detect") ->
     [ {"enable",
        sc(boolean(),

+ 1 - 1
apps/emqx/src/emqx_zone_schema.erl

@@ -23,7 +23,7 @@ namespace() -> zone.
 %% this shcema module is not used at root level.
 %% roots are added only for document generation.
 roots() -> ["mqtt", "stats", "flapping_detect", "force_shutdown",
-            "conn_congestion", "rate_limit", "quota", "force_gc",
+            "conn_congestion", "force_gc",
             "overload_protection"
            ].
 

+ 1 - 1
mix.exs

@@ -53,7 +53,7 @@ defmodule EMQXUmbrella.MixProject do
       {:gproc, github: "uwiger/gproc", tag: "0.8.0", override: true},
       {:jiffy, github: "emqx/jiffy", tag: "1.0.5", override: true},
       {:cowboy, github: "emqx/cowboy", tag: "2.9.0", override: true},
-      {:esockd, github: "emqx/esockd", tag: "5.9.0", override: true},
+      {:esockd, github: "emqx/esockd", tag: "5.9.1", override: true},
       {:mria, github: "emqx/mria", tag: "0.2.0", override: true},
       {:ekka, github: "emqx/ekka", tag: "0.12.1", override: true},
       {:gen_rpc, github: "emqx/gen_rpc", tag: "2.8.0", override: true},

+ 1 - 1
rebar.config

@@ -52,7 +52,7 @@
     , {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}
     , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
     , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}}
-    , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.0"}}}
+    , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.1"}}}
     , {mria, {git, "https://github.com/emqx/mria", {tag, "0.2.0"}}}
     , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.12.1"}}}
     , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.0"}}}