Sfoglia il codice sorgente

Merge pull request #5833 from lafirest/feat/ratelimit

feat/rate_limiter: implement hierarchical token buckets
lafirest 4 anni fa
parent
commit
04d549021b

+ 1 - 0
apps/emqx/src/emqx_schema.erl

@@ -72,6 +72,7 @@
 -export([namespace/0, roots/0, roots/1, fields/1]).
 -export([conf_get/2, conf_get/3, keys/2, filter/1]).
 -export([server_ssl_opts_schema/2, client_ssl_opts_schema/1, ciphers_schema/1, default_ciphers/1]).
+-export([sc/2, map/2]).
 
 namespace() -> undefined.
 

+ 6 - 2
apps/emqx_dashboard/src/emqx_dashboard_swagger.erl

@@ -368,14 +368,18 @@ typename_to_spec("comma_separated_atoms()", _Mod) -> #{type => string, example =
 typename_to_spec("pool_type()", _Mod) -> #{type => string, enum => [random, hash], example => hash};
 typename_to_spec("log_level()", _Mod) ->
     #{type => string, enum => [debug, info, notice, warning, error, critical, alert, emergency, all]};
+typename_to_spec("rate()", _Mod) ->
+    #{type => string, example => <<"10M/s">>};
+typename_to_spec("bucket_rate()", _Mod) ->
+    #{type => string, example => <<"10M/s, 100M">>};
 typename_to_spec(Name, Mod) ->
     Spec = range(Name),
     Spec1 = remote_module_type(Spec, Name, Mod),
     Spec2 = typerefl_array(Spec1, Name, Mod),
     Spec3 = integer(Spec2, Name),
     Spec3 =:= nomatch andalso
-        throw({error, #{msg => <<"Unsupport Type">>, type => Name, module => Mod}}),
-    Spec3.
+                 throw({error, #{msg => <<"Unsupport Type">>, type => Name, module => Mod}}),
+             Spec3.
 
 range(Name) ->
     case string:split(Name, "..") of

+ 50 - 0
apps/emqx_limiter/etc/emqx_limiter.conf

@@ -0,0 +1,50 @@
+##--------------------------------------------------------------------
+## Emq X Rate Limiter
+##--------------------------------------------------------------------
+emqx_limiter {
+  bytes_in {
+    global = "100KB/10s"         # token generation rate
+    zone.default = "100kB/10s"
+    zone.external = "20kB/10s"
+    bucket.tcp {
+      zone = default
+      aggregated = "100kB/10s,1Mb"
+      per_client = "100KB/10s,10Kb"
+    }
+    bucket.ssl {
+      zone = external
+      aggregated = "100kB/10s,1Mb"
+      per_client = "100KB/10s,10Kb"
+    }
+  }
+
+  message_in {
+    global = "100/10s"
+    zone.default = "100/10s"
+    bucket.bucket1 {
+      zone = default
+      aggregated = "100/10s,1000"
+      per_client = "100/10s,100"
+    }
+  }
+
+  connection {
+    global = "100/10s"
+    zone.default = "100/10s"
+    bucket.bucket1 {
+      zone = default
+      aggregated = "100/10s,1000"
+      per_client = "100/10s,100"
+    }
+  }
+
+  message_routing {
+    global = "100/10s"
+    zone.default = "100/10s"
+    bucket.bucket1 {
+      zone = default
+      aggregated = "100/10s,100"
+      per_client = "100/10s,10"
+    }
+  }
+}

+ 15 - 0
apps/emqx_limiter/src/emqx_limiter.app.src

@@ -0,0 +1,15 @@
+%% -*- mode: erlang -*-
+{application, emqx_limiter,
+ [{description, "EMQ X Hierachical Limiter"},
+  {vsn, "1.0.0"}, % strict semver, bump manually!
+  {modules, []},
+  {registered, [emqx_limiter_sup]},
+  {applications, [kernel,stdlib,emqx]},
+  {mod, {emqx_limiter_app,[]}},
+  {env, []},
+  {licenses, ["Apache-2.0"]},
+  {maintainers, ["EMQ X Team <contact@emqx.io>"]},
+  {links, [{"Homepage", "https://emqx.io/"},
+           {"Github", "https://github.com/emqx/emqx-retainer"}
+          ]}
+ ]}.

+ 55 - 0
apps/emqx_limiter/src/emqx_limiter_app.erl

@@ -0,0 +1,55 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_limiter_app).
+
+-behaviour(application).
+
+%% Application callbacks
+-export([start/2, stop/1]).
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% This function is called whenever an application is started using
+%% application:start/[1,2], and should start the processes of the
+%% application. If the application is structured according to the OTP
+%% design principles as a supervision tree, this means starting the
+%% top supervisor of the tree.
+%% @end
+%%--------------------------------------------------------------------
+-spec start(StartType :: normal |
+                         {takeover, Node :: node()} |
+                         {failover, Node :: node()},
+            StartArgs :: term()) ->
+          {ok, Pid :: pid()} |
+          {ok, Pid :: pid(), State :: term()} |
+          {error, Reason :: term()}.
+start(_StartType, _StartArgs) ->
+    {ok, _} = Result = emqx_limiter_sup:start_link(),
+    Result.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% This function is called whenever an application has stopped. It
+%% is intended to be the opposite of Module:start/2 and should do
+%% any necessary cleaning up. The return value is ignored.
+%% @end
+%%--------------------------------------------------------------------
+-spec stop(State :: term()) -> any().
+stop(_State) ->
+    ok.

+ 144 - 0
apps/emqx_limiter/src/emqx_limiter_client.erl

@@ -0,0 +1,144 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2019-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_limiter_client).
+
+%% API
+-export([create/5, make_ref/3, consume/2]).
+-export_type([limiter/0]).
+
+%% tocket bucket algorithm
+-record(limiter, { tokens :: non_neg_integer()
+                 , rate :: float()
+                 , capacity :: decimal()
+                 , lasttime :: millisecond()
+                 , ref :: ref_limiter()
+                 }).
+
+-record(ref, { counter :: counters:counters_ref()
+             , index :: index()
+             , rate :: decimal()
+             , obtained :: non_neg_integer()
+             }).
+
+%% TODO
+%% we should add a nop-limiter, when all the upper layers (global, zone, and buckets ) are infinity
+
+-type limiter() :: #limiter{}.
+-type ref_limiter() :: #ref{}.
+-type client() :: limiter() | ref_limiter().
+-type millisecond() :: non_neg_integer().
+-type pause_result(Client) :: {pause, millisecond(), Client}.
+-type consume_result(Client) :: {ok, Client}
+                              | pause_result(Client).
+-type decimal() :: emqx_limiter_decimal:decimal().
+-type index() :: emqx_limiter_server:index().
+
+-define(NOW, erlang:monotonic_time(millisecond)).
+-define(MINIUMN_PAUSE, 100).
+
+-import(emqx_limiter_decimal, [sub/2]).
+%%--------------------------------------------------------------------
+%%  API
+%%--------------------------------------------------------------------
+-spec create(float(),
+             decimal(),
+             counters:counters_ref(),
+             index(),
+             decimal()) -> limiter().
+create(Rate, Capacity, Counter, Index, CounterRate) ->
+    #limiter{ tokens = Capacity
+            , rate = Rate
+            , capacity = Capacity
+            , lasttime = ?NOW
+            , ref = make_ref(Counter, Index, CounterRate)
+            }.
+
+-spec make_ref(counters:counters_ref(), index(), decimal()) -> ref_limiter().
+make_ref(Counter, Idx, Rate) ->
+    #ref{counter = Counter, index = Idx, rate = Rate, obtained = 0}.
+
+-spec consume(pos_integer(), Client) -> consume_result(Client)
+              when Client :: client().
+consume(Need, #limiter{tokens = Tokens,
+                       capacity = Capacity} = Limiter) ->
+    if Need =< Tokens ->
+            try_consume_counter(Need, Limiter);
+       Need > Capacity ->
+            %% FIXME
+            %% The client should be able to send 4kb data if the rate is configured to be 2kb/s, it just needs 2s to complete.
+            throw("too big request"); %% FIXME how to deal this?
+       true ->
+            try_reset(Need, Limiter)
+    end;
+
+consume(Need, #ref{counter = Counter,
+                   index = Index,
+                   rate = Rate,
+                   obtained = Obtained} = Ref) ->
+    Tokens = counters:get(Counter, Index),
+    if Tokens >= Need ->
+            counters:sub(Counter, Index, Need),
+            {ok, Ref#ref{obtained = Obtained + Need}};
+       true ->
+            return_pause(Need - Tokens, Rate, Ref)
+    end.
+
+%%--------------------------------------------------------------------
+%%  Internal functions
+%%--------------------------------------------------------------------
+-spec try_consume_counter(pos_integer(), limiter()) -> consume_result(limiter()).
+try_consume_counter(Need,
+                    #limiter{tokens = Tokens,
+                             ref = #ref{counter = Counter,
+                                        index = Index,
+                                        obtained = Obtained,
+                                        rate = CounterRate} = Ref} = Limiter) ->
+    CT = counters:get(Counter, Index),
+    if CT >= Need ->
+            counters:sub(Counter, Index, Need),
+            {ok, Limiter#limiter{tokens = sub(Tokens, Need),
+                                 ref = Ref#ref{obtained = Obtained + Need}}};
+       true ->
+            return_pause(Need - CT, CounterRate, Limiter)
+    end.
+
+-spec try_reset(pos_integer(), limiter()) -> consume_result(limiter()).
+try_reset(Need,
+          #limiter{tokens = Tokens,
+                   rate = Rate,
+                   lasttime = LastTime,
+                   capacity = Capacity} = Limiter) ->
+    Now = ?NOW,
+    Inc = erlang:floor((Now - LastTime) * Rate / emqx_limiter_schema:minimum_period()),
+    Tokens2 = erlang:min(Tokens + Inc, Capacity),
+    if Need > Tokens2 ->
+            return_pause(Need, Rate, Limiter);
+       true ->
+            Limiter2 = Limiter#limiter{tokens = Tokens2,
+                                       lasttime = Now},
+            try_consume_counter(Need, Limiter2)
+    end.
+
+-spec return_pause(pos_integer(), decimal(), Client) -> pause_result(Client)
+              when Client :: client().
+return_pause(_, infinity, Limiter) ->
+    %% workaround when emqx_limiter_server's rate is infinity
+    {pause, ?MINIUMN_PAUSE, Limiter};
+
+return_pause(Diff, Rate, Limiter) ->
+    Pause = erlang:round(Diff * emqx_limiter_schema:minimum_period() / Rate),
+    {pause, erlang:max(Pause, ?MINIUMN_PAUSE), Limiter}.

+ 79 - 0
apps/emqx_limiter/src/emqx_limiter_decimal.erl

@@ -0,0 +1,79 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2019-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+%% a simple decimal module for rate-related calculations
+
+-module(emqx_limiter_decimal).
+
+%% API
+-export([ add/2, sub/2, mul/2
+        , add_to_counter/3, put_to_counter/3]).
+-export_type([decimal/0, zero_or_float/0]).
+
+-type decimal() :: infinity | number().
+-type zero_or_float() :: 0 | float().
+
+%%--------------------------------------------------------------------
+%%% API
+%%--------------------------------------------------------------------
+-spec add(decimal(), decimal()) -> decimal().
+add(A, B) when A =:= infinity
+               orelse B =:= infinity ->
+    infinity;
+
+add(A, B) ->
+    A + B.
+
+-spec sub(decimal(), decimal()) -> decimal().
+sub(A, B) when A =:= infinity
+               orelse B =:= infinity ->
+    infinity;
+
+sub(A, B) ->
+    A - B.
+
+-spec mul(decimal(), decimal()) -> decimal().
+mul(A, B) when A =:= infinity
+               orelse B =:= infinity ->
+    infinity;
+
+mul(A, B) ->
+    A * B.
+
+-spec add_to_counter(counters:counters_ref(), pos_integer(), decimal()) ->
+          {zero_or_float(), zero_or_float()}.
+add_to_counter(_, _, infinity) ->
+    {0, 0};
+add_to_counter(Counter, Index, Val) when is_float(Val) ->
+    IntPart = erlang:floor(Val),
+    if IntPart > 0 ->
+            counters:add(Counter, Index, IntPart);
+       true ->
+            ok
+    end,
+    {IntPart, Val - IntPart};
+add_to_counter(Counter, Index, Val) ->
+    counters:add(Counter, Index, Val),
+    {Val, 0}.
+
+-spec put_to_counter(counters:counters_ref(), pos_integer(), decimal()) -> ok.
+put_to_counter(_, _, infinity) ->
+    ok;
+put_to_counter(Counter, Index, Val) when is_float(Val) ->
+    IntPart = erlang:floor(Val),
+    counters:put(Counter, Index, IntPart);
+put_to_counter(Counter, Index, Val) ->
+    counters:put(Counter, Index, Val).

+ 229 - 0
apps/emqx_limiter/src/emqx_limiter_manager.erl

@@ -0,0 +1,229 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2019-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_limiter_manager).
+
+-behaviour(gen_server).
+
+-include_lib("emqx/include/logger.hrl").
+-include_lib("stdlib/include/ms_transform.hrl").
+
+%% API
+-export([ start_link/0, start_server/1, find_counter/1
+        , find_counter/3, insert_counter/4, insert_counter/6
+        , make_path/3, restart_server/1]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+         terminate/2, code_change/3, format_status/2]).
+
+-type path() :: list(atom()).
+-type limiter_type() :: emqx_limiter_schema:limiter_type().
+-type zone_name() :: emqx_limiter_schema:zone_name().
+-type bucket_name() :: emqx_limiter_schema:bucket_name().
+
+%% counter record in ets table
+-record(element, {path :: path(),
+                  counter :: counters:counters_ref(),
+                  index :: index(),
+                  rate :: rate()
+                 }).
+
+
+-type index() :: emqx_limiter_server:index().
+-type rate() :: emqx_limiter_decimal:decimal().
+
+-define(TAB, emqx_limiter_counters).
+
+%%--------------------------------------------------------------------
+%%  API
+%%--------------------------------------------------------------------
+-spec start_server(limiter_type()) -> _.
+start_server(Type) ->
+    emqx_limiter_server_sup:start(Type).
+
+-spec restart_server(limiter_type()) -> _.
+restart_server(Type) ->
+    emqx_limiter_server_sup:restart(Type).
+
+-spec find_counter(limiter_type(), zone_name(), bucket_name()) ->
+          {ok, counters:counters_ref(), index(), rate()} | undefined.
+find_counter(Type, Zone, BucketId) ->
+    find_counter(make_path(Type, Zone, BucketId)).
+
+-spec find_counter(path()) ->
+          {ok, counters:counters_ref(), index(), rate()} | undefined.
+find_counter(Path) ->
+    case ets:lookup(?TAB, Path) of
+        [#element{counter = Counter, index = Index, rate = Rate}] ->
+            {ok, Counter, Index, Rate};
+        _ ->
+            undefined
+    end.
+
+-spec insert_counter(limiter_type(),
+                     zone_name(),
+                     bucket_name(),
+                     counters:counters_ref(),
+                     index(),
+                     rate()) -> boolean().
+insert_counter(Type, Zone, BucketId, Counter, Index, Rate) ->
+    insert_counter(make_path(Type, Zone, BucketId),
+                   Counter,
+                   Index,
+                   Rate).
+
+-spec insert_counter(path(),
+                     counters:counters_ref(),
+                     index(),
+                     rate()) -> boolean().
+insert_counter(Path, Counter, Index, Rate) ->
+    ets:insert(?TAB,
+               #element{path = Path,
+                        counter = Counter,
+                        index = Index,
+                        rate = Rate}).
+
+-spec make_path(limiter_type(), zone_name(), bucket_name()) -> path().
+make_path(Type, Name, BucketId) ->
+    [Type, Name, BucketId].
+
+%%--------------------------------------------------------------------
+%% @doc
+%% Starts the server
+%% @end
+%%--------------------------------------------------------------------
+-spec start_link() -> {ok, Pid :: pid()} |
+          {error, Error :: {already_started, pid()}} |
+          {error, Error :: term()} |
+          ignore.
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+%%--------------------------------------------------------------------
+%%  gen_server callbacks
+%%--------------------------------------------------------------------
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Initializes the server
+%% @end
+%%--------------------------------------------------------------------
+-spec init(Args :: term()) -> {ok, State :: term()} |
+          {ok, State :: term(), Timeout :: timeout()} |
+          {ok, State :: term(), hibernate} |
+          {stop, Reason :: term()} |
+          ignore.
+init([]) ->
+    _ = ets:new(?TAB, [ set, public, named_table, {keypos, #element.path}
+                      , {write_concurrency, true}, {read_concurrency, true}
+                      , {heir, erlang:whereis(emqx_limiter_sup), none}
+                      ]),
+    {ok, #{}}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Handling call messages
+%% @end
+%%--------------------------------------------------------------------
+-spec handle_call(Request :: term(), From :: {pid(), term()}, State :: term()) ->
+          {reply, Reply :: term(), NewState :: term()} |
+          {reply, Reply :: term(), NewState :: term(), Timeout :: timeout()} |
+          {reply, Reply :: term(), NewState :: term(), hibernate} |
+          {noreply, NewState :: term()} |
+          {noreply, NewState :: term(), Timeout :: timeout()} |
+          {noreply, NewState :: term(), hibernate} |
+          {stop, Reason :: term(), Reply :: term(), NewState :: term()} |
+          {stop, Reason :: term(), NewState :: term()}.
+handle_call(Req, _From, State) ->
+    ?LOG(error, "Unexpected call: ~p", [Req]),
+    {reply, ignore, State}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Handling cast messages
+%% @end
+%%--------------------------------------------------------------------
+-spec handle_cast(Request :: term(), State :: term()) ->
+          {noreply, NewState :: term()} |
+          {noreply, NewState :: term(), Timeout :: timeout()} |
+          {noreply, NewState :: term(), hibernate} |
+          {stop, Reason :: term(), NewState :: term()}.
+handle_cast(Req, State) ->
+    ?LOG(error, "Unexpected cast: ~p", [Req]),
+    {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Handling all non call/cast messages
+%% @end
+%%--------------------------------------------------------------------
+-spec handle_info(Info :: timeout() | term(), State :: term()) ->
+          {noreply, NewState :: term()} |
+          {noreply, NewState :: term(), Timeout :: timeout()} |
+          {noreply, NewState :: term(), hibernate} |
+          {stop, Reason :: normal | term(), NewState :: term()}.
+handle_info(Info, State) ->
+    ?LOG(error, "Unexpected info: ~p", [Info]),
+    {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% This function is called by a gen_server when it is about to
+%% terminate. It should be the opposite of Module:init/1 and do any
+%% necessary cleaning up. When it returns, the gen_server terminates
+%% with Reason. The return value is ignored.
+%% @end
+%%--------------------------------------------------------------------
+-spec terminate(Reason :: normal | shutdown | {shutdown, term()} | term(),
+                State :: term()) -> any().
+terminate(_Reason, _State) ->
+    ok.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Convert process state when code is changed
+%% @end
+%%--------------------------------------------------------------------
+-spec code_change(OldVsn :: term() | {down, term()},
+                  State :: term(),
+                  Extra :: term()) -> {ok, NewState :: term()} |
+          {error, Reason :: term()}.
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% This function is called for changing the form and appearance
+%% of gen_server status when it is returned from sys:get_status/1,2
+%% or when it appears in termination error logs.
+%% @end
+%%--------------------------------------------------------------------
+-spec format_status(Opt :: normal | terminate,
+                    Status :: list()) -> Status :: term().
+format_status(_Opt, Status) ->
+    Status.
+
+%%--------------------------------------------------------------------
+%%  Internal functions
+%%--------------------------------------------------------------------

+ 140 - 0
apps/emqx_limiter/src/emqx_limiter_schema.erl

@@ -0,0 +1,140 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_limiter_schema).
+
+-include_lib("typerefl/include/types.hrl").
+
+-export([ roots/0, fields/1, to_rate/1
+        , to_bucket_rate/1, minimum_period/0]).
+
+-define(KILOBYTE, 1024).
+
+-type limiter_type() :: bytes_in
+                      | message_in
+                      | connection
+                      | message_routing.
+
+-type bucket_name() :: atom().
+-type zone_name() :: atom().
+-type rate() :: infinity | float().
+-type bucket_rate() :: list(infinity | number()).
+
+-typerefl_from_string({rate/0, ?MODULE, to_rate}).
+-typerefl_from_string({bucket_rate/0, ?MODULE, to_bucket_rate}).
+
+-reflect_type([ rate/0
+              , bucket_rate/0
+              ]).
+
+-export_type([limiter_type/0, bucket_name/0, zone_name/0]).
+
+-import(emqx_schema, [sc/2, map/2]).
+
+roots() -> [emqx_limiter].
+
+fields(emqx_limiter) ->
+    [ {bytes_in, sc(ref(limiter), #{})}
+    , {message_in, sc(ref(limiter), #{})}
+    , {connection, sc(ref(limiter), #{})}
+    , {message_routing, sc(ref(limiter), #{})}
+    ];
+
+fields(limiter) ->
+    [ {global, sc(rate(), #{})}
+    , {zone, sc(map("zone name", rate()), #{})}
+    , {bucket, sc(map("bucket id", ref(bucket)),
+                  #{desc => "Token Buckets"})}
+    ];
+
+fields(bucket) ->
+    [ {zone, sc(atom(), #{desc => "the zone which the bucket in"})}
+    , {aggregated, sc(bucket_rate(), #{})}
+    , {per_client, sc(bucket_rate(), #{})}
+    ].
+
+%% minimum period is 100ms
+minimum_period() ->
+    100.
+
+%%--------------------------------------------------------------------
+%% Internal functions
+%%--------------------------------------------------------------------
+ref(Field) -> hoconsc:ref(?MODULE, Field).
+
+to_rate(Str) ->
+    Tokens = [string:trim(T) || T <- string:tokens(Str, "/")],
+    case Tokens of
+        ["infinity"] ->
+            {ok, infinity};
+        [Quota, Interval] ->
+            {ok, Val} = to_quota(Quota),
+            case emqx_schema:to_duration_ms(Interval) of
+                {ok, Ms} when Ms > 0 ->
+                    {ok, Val * minimum_period() / Ms};
+                _ ->
+                    {error, Str}
+            end;
+        _ ->
+            {error, Str}
+    end.
+
+to_bucket_rate(Str) ->
+    Tokens = [string:trim(T) || T <- string:tokens(Str, "/,")],
+    case Tokens of
+        [Rate, Capa] ->
+            {ok, infinity} = to_quota(Rate),
+            {ok, CapaVal} = to_quota(Capa),
+            if CapaVal =/= infinity ->
+                    {ok, [infinity, CapaVal]};
+               true ->
+                    {error, Str}
+            end;
+        [Quota, Interval, Capacity] ->
+            {ok, Val} = to_quota(Quota),
+            case emqx_schema:to_duration_ms(Interval) of
+                {ok, Ms} when Ms > 0 ->
+                    {ok, CapaVal} = to_quota(Capacity),
+                    {ok, [Val * minimum_period() / Ms, CapaVal]};
+                _ ->
+                    {error, Str}
+            end;
+        _ ->
+            {error, Str}
+    end.
+
+
+to_quota(Str) ->
+    {ok, MP} = re:compile("^\s*(?:(?:([1-9][0-9]*)([a-zA-z]*))|infinity)\s*$"),
+    Result = re:run(Str, MP, [{capture, all_but_first, list}]),
+    case Result of
+        {match, [Quota, Unit]} ->
+            Val = erlang:list_to_integer(Quota),
+            Unit2 = string:to_lower(Unit),
+            {ok, apply_unit(Unit2, Val)};
+        {match, [Quota]} ->
+            {ok, erlang:list_to_integer(Quota)};
+        {match, []} ->
+            {ok, infinity};
+        _ ->
+            {error, Str}
+    end.
+
+apply_unit("", Val) -> Val;
+apply_unit("kb", Val) -> Val * ?KILOBYTE;
+apply_unit("mb", Val) -> Val * ?KILOBYTE * ?KILOBYTE;
+apply_unit("gb", Val) -> Val * ?KILOBYTE * ?KILOBYTE * ?KILOBYTE;
+apply_unit(Unit, _) -> throw("invalid unit:" ++ Unit).

+ 426 - 0
apps/emqx_limiter/src/emqx_limiter_server.erl

@@ -0,0 +1,426 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+%% A hierachical token bucket algorithm
+%% Note: this is not the linux HTB algorithm(http://luxik.cdi.cz/~devik/qos/htb/manual/theory.htm)
+%% Algorithm:
+%% 1. the root node periodically generates tokens and then distributes them
+%% just like the oscillation of water waves
+%% 2. the leaf node has a counter, which is the place where the token is actually held.
+%% 3. other nodes only play the role of transmission, and the rate of the node is like a valve,
+%% limiting the oscillation transmitted from the parent node
+
+-module(emqx_limiter_server).
+
+-behaviour(gen_server).
+
+-include_lib("emqx/include/logger.hrl").
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+         terminate/2, code_change/3, format_status/2]).
+
+-export([ start_link/1, connect/2, info/2
+        , name/1]).
+
+-record(root, { rate :: rate()             %% number of tokens generated per period
+              , period :: pos_integer()    %% token generation interval(second)
+              , childs :: list(node_id())  %% node children
+              , consumed :: non_neg_integer()
+              }).
+
+-record(zone, { id :: pos_integer()
+              , name :: zone_name()
+              , rate :: rate()
+              , obtained :: non_neg_integer()       %% number of tokens obtained
+              , childs :: list(node_id())
+              }).
+
+-record(bucket, { id :: pos_integer()
+                , name :: bucket_name()
+                , rate :: rate()
+                , obtained :: non_neg_integer()
+                , correction :: emqx_limiter_decimal:zero_or_float() %% token correction value
+                , capacity :: capacity()
+                , counter :: counters:counters_ref()
+                , index :: index()
+                }).
+
+-record(state, { root :: undefined | root()
+               , counter :: undefined | counters:counters_ref() %% current counter to alloc
+               , index :: index()
+               , zones :: #{zone_name() => node_id()}
+               , nodes :: nodes()
+               , type :: limiter_type()
+               }).
+
+%% maybe use maps is better, but record is fastter
+-define(FIELD_OBTAINED, #zone.obtained).
+-define(GET_FIELD(F, Node), element(F, Node)).
+-define(CALL(Type, Msg), gen_server:call(name(Type), {?FUNCTION_NAME, Msg})).
+
+-type node_id() :: pos_integer().
+-type root() :: #root{}.
+-type zone() :: #zone{}.
+-type bucket() :: #bucket{}.
+-type node_data() :: zone() | bucket().
+-type nodes() :: #{node_id() => node_data()}.
+-type zone_name() :: emqx_limiter_schema:zone_name().
+-type limiter_type() :: emqx_limiter_schema:limiter_type().
+-type bucket_name() :: emqx_limiter_schema:bucket_name().
+-type rate() :: decimal().
+-type flow() :: decimal().
+-type capacity() :: decimal().
+-type decimal() :: emqx_limiter_decimal:decimal().
+-type state() :: #state{}.
+-type index() :: pos_integer().
+
+-export_type([index/0]).
+-import(emqx_limiter_decimal, [add/2, sub/2, mul/2, add_to_counter/3, put_to_counter/3]).
+
+%%--------------------------------------------------------------------
+%% API
+%%--------------------------------------------------------------------
+-spec connect(limiter_type(), bucket_name()) -> emqx_limiter_client:client().
+connect(Type, Bucket) ->
+    #{zone := Zone,
+      aggregated := [Aggr, Capacity],
+      per_client := [Client, ClientCapa]} = emqx:get_config([emqx_limiter, Type, bucket, Bucket]),
+    case emqx_limiter_manager:find_counter(Type, Zone, Bucket) of
+        {ok, Counter, Idx, Rate} ->
+            if Client =/= infinity andalso (Client < Aggr orelse ClientCapa < Capacity) ->
+                    emqx_limiter_client:create(Client, ClientCapa, Counter, Idx, Rate);
+               true ->
+                    emqx_limiter_client:make_ref(Counter, Idx, Rate)
+            end;
+        _ ->
+            ?LOG(error, "can't find the bucket:~p which type is:~p~n", [Bucket, Type]),
+            throw("invalid bucket")
+    end.
+
+-spec info(limiter_type(), atom()) -> term().
+info(Type, Info) ->
+    ?CALL(Type, Info).
+
+-spec name(limiter_type()) -> atom().
+name(Type) ->
+    erlang:list_to_atom(io_lib:format("~s_~s", [?MODULE, Type])).
+
+%%--------------------------------------------------------------------
+%% @doc
+%% Starts the server
+%% @end
+%%--------------------------------------------------------------------
+-spec start_link(limiter_type()) -> _.
+start_link(Type) ->
+    gen_server:start_link({local, name(Type)}, ?MODULE, [Type], []).
+
+%%--------------------------------------------------------------------
+%%% gen_server callbacks
+%%--------------------------------------------------------------------
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Initializes the server
+%% @end
+%%--------------------------------------------------------------------
+-spec init(Args :: term()) -> {ok, State :: term()} |
+          {ok, State :: term(), Timeout :: timeout()} |
+          {ok, State :: term(), hibernate} |
+          {stop, Reason :: term()} |
+          ignore.
+init([Type]) ->
+    State = #state{zones = #{},
+                   nodes = #{},
+                   type = Type,
+                   index = 1},
+    State2 = init_tree(Type, State),
+    oscillate(State2#state.root#root.period),
+    {ok, State2}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Handling call messages
+%% @end
+%%--------------------------------------------------------------------
+-spec handle_call(Request :: term(), From :: {pid(), term()}, State :: term()) ->
+          {reply, Reply :: term(), NewState :: term()} |
+          {reply, Reply :: term(), NewState :: term(), Timeout :: timeout()} |
+          {reply, Reply :: term(), NewState :: term(), hibernate} |
+          {noreply, NewState :: term()} |
+          {noreply, NewState :: term(), Timeout :: timeout()} |
+          {noreply, NewState :: term(), hibernate} |
+          {stop, Reason :: term(), Reply :: term(), NewState :: term()} |
+          {stop, Reason :: term(), NewState :: term()}.
+handle_call(Req, _From, State) ->
+    ?LOG(error, "Unexpected call: ~p", [Req]),
+    {reply, ignored, State}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Handling cast messages
+%% @end
+%%--------------------------------------------------------------------
+-spec handle_cast(Request :: term(), State :: term()) ->
+          {noreply, NewState :: term()} |
+          {noreply, NewState :: term(), Timeout :: timeout()} |
+          {noreply, NewState :: term(), hibernate} |
+          {stop, Reason :: term(), NewState :: term()}.
+handle_cast(Req, State) ->
+    ?LOG(error, "Unexpected cast: ~p", [Req]),
+    {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Handling all non call/cast messages
+%% @end
+%%--------------------------------------------------------------------
+-spec handle_info(Info :: timeout() | term(), State :: term()) ->
+          {noreply, NewState :: term()} |
+          {noreply, NewState :: term(), Timeout :: timeout()} |
+          {noreply, NewState :: term(), hibernate} |
+          {stop, Reason :: normal | term(), NewState :: term()}.
+handle_info(oscillate, State) ->
+    {noreply, oscillation(State)};
+
+handle_info(Info, State) ->
+    ?LOG(error, "Unexpected info: ~p", [Info]),
+    {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% This function is called by a gen_server when it is about to
+%% terminate. It should be the opposite of Module:init/1 and do any
+%% necessary cleaning up. When it returns, the gen_server terminates
+%% with Reason. The return value is ignored.
+%% @end
+%%--------------------------------------------------------------------
+-spec terminate(Reason :: normal | shutdown | {shutdown, term()} | term(),
+                State :: term()) -> any().
+terminate(_Reason, _State) ->
+    ok.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Convert process state when code is changed
+%% @end
+%%--------------------------------------------------------------------
+-spec code_change(OldVsn :: term() | {down, term()},
+                  State :: term(),
+                  Extra :: term()) -> {ok, NewState :: term()} |
+          {error, Reason :: term()}.
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% This function is called for changing the form and appearance
+%% of gen_server status when it is returned from sys:get_status/1,2
+%% or when it appears in termination error logs.
+%% @end
+%%--------------------------------------------------------------------
+-spec format_status(Opt :: normal | terminate,
+                    Status :: list()) -> Status :: term().
+format_status(_Opt, Status) ->
+    Status.
+
+%%--------------------------------------------------------------------
+%%% Internal functions
+%%--------------------------------------------------------------------
+oscillate(Interval) ->
+    erlang:send_after(Interval, self(), ?FUNCTION_NAME).
+
+%% @doc generate tokens, and then spread to leaf nodes
+-spec oscillation(state()) -> state().
+oscillation(#state{root = #root{rate = Flow,
+                                period = Interval,
+                                childs = ChildIds,
+                                consumed = Consumed} = Root,
+                   nodes = Nodes} = State) ->
+    oscillate(Interval),
+    Childs = get_orderd_childs(ChildIds, Nodes),
+    {Alloced, Nodes2} = transverse(Childs, Flow, 0, Nodes),
+    State#state{nodes = Nodes2,
+                root = Root#root{consumed = Consumed + Alloced}}.
+
+%% @doc horizontal spread
+-spec transverse(list(node_data()),
+                 flow(),
+                 non_neg_integer(),
+                 nodes()) -> {non_neg_integer(), nodes()}.
+transverse([H | T], InFlow, Alloced, Nodes) when InFlow > 0 ->
+    {NodeAlloced, Nodes2} = longitudinal(H, InFlow, Nodes),
+    InFlow2 = sub(InFlow, NodeAlloced),
+    Alloced2 = Alloced + NodeAlloced,
+    transverse(T, InFlow2, Alloced2, Nodes2);
+
+transverse(_, _, Alloced, Nodes) ->
+    {Alloced, Nodes}.
+
+%% @doc vertical spread
+-spec longitudinal(node_data(), flow(), nodes()) ->
+          {non_neg_integer(), nodes()}.
+longitudinal(#zone{id = Id,
+                   rate = Rate,
+                   obtained = Obtained,
+                   childs = ChildIds} = Node, InFlow, Nodes) ->
+    Flow = erlang:min(InFlow, Rate),
+
+    if Flow > 0 ->
+            Childs = get_orderd_childs(ChildIds, Nodes),
+            {Alloced, Nodes2} = transverse(Childs, Flow, 0, Nodes),
+            if Alloced > 0 ->
+                    {Alloced,
+                     Nodes2#{Id => Node#zone{obtained = Obtained + Alloced}}};
+               true ->
+                    %% childs are empty or all counter childs are full
+                    {0, Nodes}
+            end;
+       true ->
+            {0, Nodes}
+    end;
+
+longitudinal(#bucket{id = Id,
+                     rate = Rate,
+                     capacity = Capacity,
+                     correction = Correction,
+                     counter = Counter,
+                     index = Index,
+                     obtained = Obtained} = Node, InFlow, Nodes) ->
+    Flow = add(erlang:min(InFlow, Rate), Correction),
+
+    Tokens = counters:get(Counter, Index),
+    %% toknes's value mayb be a negative value(stolen from the future)
+    Avaiable = erlang:min(if Tokens < 0 ->
+                                  add(Capacity, Tokens);
+                             true ->
+                                  sub(Capacity, Tokens)
+                          end, Flow),
+    FixAvaiable = erlang:min(Capacity, Avaiable),
+    if FixAvaiable > 0 ->
+            {Alloced, Decimal} = add_to_counter(Counter, Index, FixAvaiable),
+
+            {Alloced,
+             Nodes#{Id => Node#bucket{obtained = Obtained + Alloced,
+                                      correction = Decimal}}};
+       true ->
+            {0, Nodes}
+    end.
+
+-spec get_orderd_childs(list(node_id()), nodes()) -> list(node_data()).
+get_orderd_childs(Ids, Nodes) ->
+    Childs = [maps:get(Id, Nodes) || Id <- Ids],
+
+    %% sort by obtained, avoid node goes hungry
+    lists:sort(fun(A, B) ->
+                       ?GET_FIELD(?FIELD_OBTAINED, A) < ?GET_FIELD(?FIELD_OBTAINED, B)
+               end,
+               Childs).
+
+-spec init_tree(emqx_limiter_schema:limiter_type(), state()) -> state().
+init_tree(Type, State) ->
+    #{global := Global,
+      zone := Zone,
+      bucket := Bucket} = emqx:get_config([emqx_limiter, Type]),
+    {Factor, Root} = make_root(Global, Zone),
+    State2 = State#state{root = Root},
+    {NodeId, State3} = make_zone(maps:to_list(Zone), Factor, 1, State2),
+    State4 = State3#state{counter = counters:new(maps:size(Bucket),
+                                                 [write_concurrency])},
+    make_bucket(maps:to_list(Bucket), Factor, NodeId, State4).
+
+-spec make_root(decimal(), hocon:config()) -> {number(), root()}.
+make_root(Rate, Zone) ->
+    ZoneNum = maps:size(Zone),
+    Childs = lists:seq(1, ZoneNum),
+    MiniPeriod = emqx_limiter_schema:minimum_period(),
+    if Rate >= 1 ->
+            {1, #root{rate = Rate,
+                      period = MiniPeriod,
+                      childs = Childs,
+                      consumed = 0}};
+       true ->
+            Factor = 1 / Rate,
+            {Factor, #root{rate = 1,
+                           period = erlang:floor(Factor * MiniPeriod),
+                           childs = Childs,
+                           consumed = 0}}
+    end.
+
+make_zone([{Name, Rate} | T], Factor, NodeId, State) ->
+    #state{zones = Zones, nodes = Nodes} = State,
+    Zone = #zone{id = NodeId,
+                 name = Name,
+                 rate = mul(Rate, Factor),
+                 obtained = 0,
+                 childs = []},
+    State2 = State#state{zones = Zones#{Name => NodeId},
+                         nodes = Nodes#{NodeId => Zone}},
+    make_zone(T, Factor, NodeId + 1, State2);
+
+make_zone([], _, NodeId, State2) ->
+    {NodeId, State2}.
+
+make_bucket([{Name, Conf} | T], Factor, NodeId, State) ->
+    #{zone := ZoneName,
+      aggregated := [Rate, Capacity]} = Conf,
+    {Counter, Idx, State2} = alloc_counter(ZoneName, Name, Rate, State),
+    Node = #bucket{ id = NodeId
+                  , name = Name
+                  , rate = mul(Rate, Factor)
+                  , obtained = 0
+                  , correction = 0
+                  , capacity = Capacity
+                  , counter = Counter
+                  , index = Idx},
+    State3 = add_zone_child(NodeId, Node, ZoneName, State2),
+    make_bucket(T, Factor, NodeId + 1, State3);
+
+make_bucket([], _, _, State) ->
+    State.
+
+-spec alloc_counter(zone_name(), bucket_name(), rate(), state()) ->
+          {counters:counters_ref(), pos_integer(), state()}.
+alloc_counter(Zone, Bucket, Rate,
+              #state{type = Type, counter = Counter, index = Index} = State) ->
+    Path = emqx_limiter_manager:make_path(Type, Zone, Bucket),
+    case emqx_limiter_manager:find_counter(Path) of
+        undefined ->
+            init_counter(Path, Counter, Index,
+                         Rate, State#state{index = Index + 1});
+        {ok, ECounter, EIndex, _} ->
+            init_counter(Path, ECounter, EIndex, Rate, State)
+    end.
+
+init_counter(Path, Counter, Index, Rate, State) ->
+    _ = put_to_counter(Counter, Index, 0),
+    emqx_limiter_manager:insert_counter(Path, Counter, Index, Rate),
+    {Counter, Index, State}.
+
+-spec add_zone_child(node_id(), bucket(), zone_name(), state()) -> state().
+add_zone_child(NodeId, Bucket, Name, #state{zones = Zones, nodes = Nodes} = State) ->
+    ZoneId = maps:get(Name, Zones),
+    #zone{childs = Childs} = Zone = maps:get(ZoneId, Nodes),
+    Nodes2 = Nodes#{ZoneId => Zone#zone{childs = [NodeId | Childs]},
+                    NodeId => Bucket},
+    State#state{nodes = Nodes2}.

+ 94 - 0
apps/emqx_limiter/src/emqx_limiter_server_sup.erl

@@ -0,0 +1,94 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_limiter_server_sup).
+
+-behaviour(supervisor).
+
+%% API
+-export([start_link/0, start/1, restart/1]).
+
+%% Supervisor callbacks
+-export([init/1]).
+
+%%--==================================================================
+%%  API functions
+%%--==================================================================
+
+%%--------------------------------------------------------------------
+%% @doc
+%% Starts the supervisor
+%% @end
+%%--------------------------------------------------------------------
+-spec start_link() -> {ok, Pid :: pid()} |
+          {error, {already_started, Pid :: pid()}} |
+          {error, {shutdown, term()}} |
+          {error, term()} |
+          ignore.
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+-spec start(emqx_limiter_schema:limiter_type()) -> _.
+start(Type) ->
+    Spec = make_child(Type),
+    supervisor:start_child(?MODULE, Spec).
+
+-spec restart(emqx_limiter_schema:limiter_type()) -> _.
+restart(Type) ->
+    Id = emqx_limiter_server:name(Type),
+    _ = supervisor:terminate_child(?MODULE, Id),
+    supervisor:restart_child(?MODULE, Id).
+
+%%--==================================================================
+%%  Supervisor callbacks
+%%--==================================================================
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Whenever a supervisor is started using supervisor:start_link/[2,3],
+%% this function is called by the new process to find out about
+%% restart strategy, maximum restart intensity, and child
+%% specifications.
+%% @end
+%%--------------------------------------------------------------------
+-spec init(Args :: term()) ->
+          {ok, {SupFlags :: supervisor:sup_flags(),
+                [ChildSpec :: supervisor:child_spec()]}} |
+          ignore.
+init([]) ->
+    SupFlags = #{strategy => one_for_one,
+                 intensity => 10,
+                 period => 3600},
+
+    {ok, {SupFlags, childs()}}.
+
+%%--==================================================================
+%%  Internal functions
+%%--==================================================================
+make_child(Type) ->
+    Id = emqx_limiter_server:name(Type),
+    #{id => Id,
+      start => {emqx_limiter_server, start_link, [Type]},
+      restart => transient,
+      shutdown => 5000,
+      type => worker,
+      modules => [emqx_limiter_server]}.
+
+childs() ->
+    Conf = emqx:get_config([emqx_limiter]),
+    Types = maps:keys(Conf),
+    [make_child(Type) || Type <- Types].

+ 76 - 0
apps/emqx_limiter/src/emqx_limiter_sup.erl

@@ -0,0 +1,76 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_limiter_sup).
+
+-behaviour(supervisor).
+
+%% API
+-export([start_link/0]).
+
+%% Supervisor callbacks
+-export([init/1]).
+
+%%--------------------------------------------------------------------
+%%  API functions
+%%--------------------------------------------------------------------
+
+%%--------------------------------------------------------------------
+%% @doc
+%% Starts the supervisor
+%% @end
+%%--------------------------------------------------------------------
+-spec start_link() -> {ok, Pid :: pid()} |
+          {error, {already_started, Pid :: pid()}} |
+          {error, {shutdown, term()}} |
+          {error, term()} |
+          ignore.
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+%%--------------------------------------------------------------------
+%%  Supervisor callbacks
+%%--------------------------------------------------------------------
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Whenever a supervisor is started using supervisor:start_link/[2,3],
+%% this function is called by the new process to find out about
+%% restart strategy, maximum restart intensity, and child
+%% specifications.
+%% @end
+%%--------------------------------------------------------------------
+-spec init(Args :: term()) ->
+          {ok, {SupFlags :: supervisor:sup_flags(),
+                [ChildSpec :: supervisor:child_spec()]}} |
+          ignore.
+init([]) ->
+    SupFlags = #{strategy => one_for_one,
+                 intensity => 10,
+                 period => 3600},
+
+    Childs = [ make_child(emqx_limiter_manager, worker)
+             , make_child(emqx_limiter_server_sup, supervisor)],
+
+    {ok, {SupFlags, Childs}}.
+
+make_child(Mod, Type) ->
+    #{id => Mod,
+      start => {Mod, start_link, []},
+      restart => transient,
+      type => Type,
+      modules => [Mod]}.

+ 272 - 0
apps/emqx_limiter/test/emqx_limiter_SUITE.erl

@@ -0,0 +1,272 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_limiter_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-define(APP, emqx_limiter).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+
+-define(BASE_CONF, <<"""
+emqx_limiter {
+  bytes_in {global = \"100KB/10s\"
+            zone.default = \"100kB/10s\"
+            zone.external = \"20kB/10s\"
+            bucket.tcp {zone = default
+                        aggregated = \"100kB/10s,1Mb\"
+                        per_client = \"100KB/10s,10Kb\"}
+            bucket.ssl {zone = external
+                        aggregated = \"100kB/10s,1Mb\"
+                        per_client = \"100KB/10s,10Kb\"}
+           }
+
+  message_in {global = \"100/10s\"
+              zone.default = \"100/10s\"
+              bucket.bucket1 {zone = default
+                              aggregated = \"100/10s,1000\"
+                              per_client = \"100/10s,100\"}
+             }
+
+  connection {global = \"100/10s\"
+              zone.default = \"100/10s\"
+              bucket.bucket1 {zone = default
+                              aggregated = \"100/10s,100\"
+                              per_client = \"100/10s,10\"
+                             }
+             }
+
+  message_routing {global = \"100/10s\"
+                    zone.default = \"100/10s\"
+                    bucket.bucket1 {zone = default
+                                    aggregated = \"100/10s,100\"
+                                    per_client = \"100/10s,10\"
+                                   }
+                  }
+}""">>).
+
+-define(LOGT(Format, Args), ct:pal("TEST_SUITE: " ++ Format, Args)).
+
+-record(client_options, { interval :: non_neg_integer()
+                        , per_cost :: non_neg_integer()
+                        , type :: atom()
+                        , bucket :: atom()
+                        , lifetime :: non_neg_integer()
+                        , rates :: list(tuple())
+                        }).
+
+-record(client_state, { client :: emqx_limiter_client:limiter()
+                      , pid :: pid()
+                      , got :: non_neg_integer()
+                      , options :: #client_options{}}).
+
+%%--------------------------------------------------------------------
+%% Setups
+%%--------------------------------------------------------------------
+all() -> emqx_ct:all(?MODULE).
+
+init_per_suite(Config) ->
+    ok = emqx_config:init_load(emqx_limiter_schema, ?BASE_CONF),
+    emqx_ct_helpers:start_apps([?APP]),
+    Config.
+
+end_per_suite(_Config) ->
+    emqx_ct_helpers:stop_apps([?APP]).
+
+init_per_testcase(_TestCase, Config) ->
+    Config.
+
+%%--------------------------------------------------------------------
+%% Test Cases
+%%--------------------------------------------------------------------
+t_un_overload(_) ->
+    Conf = emqx:get_config([emqx_limiter]),
+    Conn = #{global => to_rate("infinity"),
+             zone => #{z1 => to_rate("1000/1s"),
+                       z2 => to_rate("1000/1s")},
+             bucket => #{b1 => #{zone => z1,
+                                 aggregated => to_bucket_rate("100/1s, 500"),
+                                 per_client => to_bucket_rate("10/1s, 50")},
+                         b2 => #{zone => z2,
+                                 aggregated => to_bucket_rate("500/1s, 500"),
+                                 per_client => to_bucket_rate("100/1s, infinity")
+                                }}},
+    Conf2 = Conf#{connection => Conn},
+    emqx_config:put([emqx_limiter], Conf2),
+    {ok, _} = emqx_limiter_manager:restart_server(connection),
+
+    timer:sleep(200),
+
+    B1C = #client_options{interval = 100,
+                          per_cost = 1,
+                          type = connection,
+                          bucket = b1,
+                          lifetime = timer:seconds(3),
+                          rates = [{fun erlang:'=<'/2, ["1000/1s", "100/1s"]},
+                                   {fun erlang:'=:='/2, ["10/1s"]}]},
+
+    B2C = #client_options{interval = 100,
+                          per_cost = 10,
+                          type = connection,
+                          bucket = b2,
+                          lifetime = timer:seconds(3),
+                          rates = [{fun erlang:'=<'/2, ["1000/1s", "500/1s"]},
+                                   {fun erlang:'=:='/2, ["100/1s"]}]},
+
+    lists:foreach(fun(_) -> start_client(B1C) end,
+                  lists:seq(1, 10)),
+
+
+    lists:foreach(fun(_) -> start_client(B2C) end,
+                  lists:seq(1, 5)),
+
+    ?assert(check_client_result(10 + 5)).
+
+t_infinity(_) ->
+    Conf = emqx:get_config([emqx_limiter]),
+    Conn = #{global => to_rate("infinity"),
+             zone => #{z1 => to_rate("1000/1s"),
+                       z2 => to_rate("infinity")},
+             bucket => #{b1 => #{zone => z1,
+                                 aggregated => to_bucket_rate("100/1s, infinity"),
+                                 per_client => to_bucket_rate("10/1s, 100")},
+                         b2 => #{zone => z2,
+                                 aggregated => to_bucket_rate("infinity, 600"),
+                                 per_client => to_bucket_rate("100/1s, infinity")
+                                }}},
+    Conf2 = Conf#{connection => Conn},
+    emqx_config:put([emqx_limiter], Conf2),
+    {ok, _} = emqx_limiter_manager:restart_server(connection),
+
+    timer:sleep(200),
+
+    B1C = #client_options{interval = 100,
+                          per_cost = 1,
+                          type = connection,
+                          bucket = b1,
+                          lifetime = timer:seconds(3),
+                          rates = [{fun erlang:'=<'/2, ["1000/1s", "100/1s"]},
+                                   {fun erlang:'=:='/2, ["10/1s"]}]},
+
+    B2C = #client_options{interval = 100,
+                          per_cost = 10,
+                          type = connection,
+                          bucket = b2,
+                          lifetime = timer:seconds(3),
+                          rates = [{fun erlang:'=:='/2, ["100/1s"]}]},
+
+    lists:foreach(fun(_) -> start_client(B1C) end,
+                  lists:seq(1, 8)),
+
+    lists:foreach(fun(_) -> start_client(B2C) end,
+                  lists:seq(1, 4)),
+
+    ?assert(check_client_result(8 + 4)).
+
+%%--------------------------------------------------------------------
+%%% Internal functions
+%%--------------------------------------------------------------------
+start_client(Opts) ->
+    Pid = self(),
+    erlang:spawn(fun() -> enter_client(Opts, Pid) end).
+
+enter_client(#client_options{type = Type,
+                             bucket = Bucket,
+                             lifetime = Lifetime} = Opts,
+             Pid) ->
+    erlang:send_after(Lifetime, self(), stop),
+    erlang:send(self(), consume),
+    Client = emqx_limiter_server:connect(Type, Bucket),
+    client_loop(#client_state{client = Client,
+                              pid = Pid,
+                              got = 0,
+                              options = Opts}).
+
+client_loop(#client_state{client = Client,
+                          got = Got,
+                          pid = Pid,
+                          options = #client_options{interval = Interval,
+                                                    per_cost = PerCost,
+                                                    lifetime = Lifetime,
+                                                    rates = Rates}} = State) ->
+    receive
+        consume ->
+            case emqx_limiter_client:consume(PerCost, Client) of
+                {ok, Client2} ->
+                    erlang:send_after(Interval, self(), consume),
+                    client_loop(State#client_state{client = Client2,
+                                                   got = Got + PerCost});
+                {pause, MS, Client2} ->
+                    erlang:send_after(MS, self(), {resume, erlang:system_time(millisecond)}),
+                    client_loop(State#client_state{client = Client2})
+            end;
+        stop ->
+            Rate = Got * emqx_limiter_schema:minimum_period() / Lifetime,
+            ?LOGT("Got:~p, Rate is:~p Checks:~p~n", [Got, Rate, Rate]),
+            Check = check_rates(Rate, Rates),
+            erlang:send(Pid, {client, Check});
+        {resume, Begin} ->
+            case emqx_limiter_client:consume(PerCost, Client) of
+                {ok, Client2} ->
+                    Now = erlang:system_time(millisecond),
+                    Diff = erlang:max(0, Interval - (Now - Begin)),
+                    erlang:send_after(Diff, self(), consume),
+                    client_loop(State#client_state{client = Client2,
+                                                   got = Got + PerCost});
+                {pause, MS, Client2} ->
+                    erlang:send_after(MS, self(), {resume, Begin}),
+                    client_loop(State#client_state{client = Client2})
+            end
+    end.
+
+check_rates(Rate, [{Fun, Rates} | T]) ->
+    case lists:all(fun(E) -> Fun(Rate, to_rate(E)) end, Rates) of
+        true ->
+            check_rates(Rate, T);
+        false ->
+            false
+    end;
+check_rates(_, _) ->
+    true.
+
+check_client_result(0) ->
+    true;
+
+check_client_result(N) ->
+    ?LOGT("check_client_result:~p~n", [N]),
+    receive
+        {client, true} ->
+            check_client_result(N - 1);
+        {client, false} ->
+            false;
+        Any ->
+            ?LOGT(">>>> other:~p~n", [Any])
+
+    after 3500 ->
+            ?LOGT(">>>> timeout~n", []),
+            false
+    end.
+
+to_rate(Str) ->
+    {ok, Rate} = emqx_limiter_schema:to_rate(Str),
+    Rate.
+
+to_bucket_rate(Str) ->
+    {ok, Result} = emqx_limiter_schema:to_bucket_rate(Str),
+    Result.

+ 1 - 0
apps/emqx_machine/src/emqx_machine_schema.erl

@@ -54,6 +54,7 @@
         , emqx_rule_engine_schema
         , emqx_exhook_schema
         , emqx_psk_schema
+        , emqx_limiter_schema
         ]).
 
 namespace() -> undefined.

+ 1 - 0
rebar.config.erl

@@ -279,6 +279,7 @@ relx_apps(ReleaseType) ->
     , emqx_statsd
     , emqx_prometheus
     , emqx_psk
+    , emqx_limiter
     ]
     ++ [quicer || is_quicer_supported()]
     ++ [emqx_license || is_enterprise()]