Просмотр исходного кода

fix(limiter): optimize the instance of limiter

We can reduce a limiter container with all types are `infinity` to just a `infinity` atom
firest 2 лет назад
Родитель
Сommit
c2e35a42b0

+ 2 - 2
apps/emqx/src/emqx_channel.erl

@@ -89,7 +89,7 @@
     %% Authentication Data Cache
     auth_cache :: maybe(map()),
     %% Quota checkers
-    quota :: maybe(emqx_limiter_container:limiter()),
+    quota :: emqx_limiter_container:limiter(),
     %% Timers
     timers :: #{atom() => disabled | maybe(reference())},
     %% Conn State
@@ -760,7 +760,7 @@ do_publish(
             handle_out(disconnect, RC, Channel)
     end.
 
-ensure_quota(_, Channel = #channel{quota = undefined}) ->
+ensure_quota(_, Channel = #channel{quota = infinity}) ->
     Channel;
 ensure_quota(PubRes, Channel = #channel{quota = Limiter}) ->
     Cnt = lists:foldl(

+ 50 - 44
apps/emqx/src/emqx_connection.erl

@@ -111,7 +111,7 @@
     listener :: {Type :: atom(), Name :: atom()},
 
     %% Limiter
-    limiter :: maybe(limiter()),
+    limiter :: limiter(),
 
     %% limiter buffer for overload use
     limiter_buffer :: queue:queue(pending_req()),
@@ -974,55 +974,61 @@ handle_cast(Req, State) ->
     list(any()),
     state()
 ) -> _.
+
+check_limiter(
+    _Needs,
+    Data,
+    WhenOk,
+    Msgs,
+    #state{limiter = infinity} = State
+) ->
+    WhenOk(Data, Msgs, State);
 check_limiter(
     Needs,
     Data,
     WhenOk,
     Msgs,
-    #state{
-        limiter = Limiter,
-        limiter_timer = LimiterTimer,
-        limiter_buffer = Cache
-    } = State
-) when Limiter =/= undefined ->
-    case LimiterTimer of
-        undefined ->
-            case emqx_limiter_container:check_list(Needs, Limiter) of
-                {ok, Limiter2} ->
-                    WhenOk(Data, Msgs, State#state{limiter = Limiter2});
-                {pause, Time, Limiter2} ->
-                    ?SLOG(debug, #{
-                        msg => "pause_time_dueto_rate_limit",
-                        needs => Needs,
-                        time_in_ms => Time
-                    }),
-
-                    Retry = #retry{
-                        types = [Type || {_, Type} <- Needs],
-                        data = Data,
-                        next = WhenOk
-                    },
-
-                    Limiter3 = emqx_limiter_container:set_retry_context(Retry, Limiter2),
-
-                    TRef = start_timer(Time, limit_timeout),
-
-                    {ok, State#state{
-                        limiter = Limiter3,
-                        limiter_timer = TRef
-                    }};
-                {drop, Limiter2} ->
-                    {ok, State#state{limiter = Limiter2}}
-            end;
-        _ ->
-            %% if there has a retry timer,
-            %% cache the operation and execute it after the retry is over
-            %% the maximum length of the cache queue is equal to the active_n
-            New = #pending_req{need = Needs, data = Data, next = WhenOk},
-            {ok, State#state{limiter_buffer = queue:in(New, Cache)}}
+    #state{limiter_timer = undefined, limiter = Limiter} = State
+) ->
+    case emqx_limiter_container:check_list(Needs, Limiter) of
+        {ok, Limiter2} ->
+            WhenOk(Data, Msgs, State#state{limiter = Limiter2});
+        {pause, Time, Limiter2} ->
+            ?SLOG(debug, #{
+                msg => "pause_time_dueto_rate_limit",
+                needs => Needs,
+                time_in_ms => Time
+            }),
+
+            Retry = #retry{
+                types = [Type || {_, Type} <- Needs],
+                data = Data,
+                next = WhenOk
+            },
+
+            Limiter3 = emqx_limiter_container:set_retry_context(Retry, Limiter2),
+
+            TRef = start_timer(Time, limit_timeout),
+
+            {ok, State#state{
+                limiter = Limiter3,
+                limiter_timer = TRef
+            }};
+        {drop, Limiter2} ->
+            {ok, State#state{limiter = Limiter2}}
     end;
-check_limiter(_, Data, WhenOk, Msgs, State) ->
-    WhenOk(Data, Msgs, State).
+check_limiter(
+    Needs,
+    Data,
+    WhenOk,
+    _Msgs,
+    #state{limiter_buffer = Cache} = State
+) ->
+    %% if there has a retry timer,
+    %% cache the operation and execute it after the retry is over
+    %% the maximum length of the cache queue is equal to the active_n
+    New = #pending_req{need = Needs, data = Data, next = WhenOk},
+    {ok, State#state{limiter_buffer = queue:in(New, Cache)}}.
 
 %% try to perform a retry
 -spec retry_limiter(state()) -> _.

+ 34 - 11
apps/emqx/src/emqx_limiter/src/emqx_limiter_container.erl

@@ -34,16 +34,18 @@
 
 -export_type([container/0, check_result/0]).
 
--type container() :: #{
-    limiter_type() => undefined | limiter(),
-    %% the retry context of the limiter
-    retry_key() =>
-        undefined
-        | retry_context()
-        | future(),
-    %% the retry context of the container
-    retry_ctx := undefined | any()
-}.
+-type container() ::
+    infinity
+    | #{
+        limiter_type() => undefined | limiter(),
+        %% the retry context of the limiter
+        retry_key() =>
+            undefined
+            | retry_context()
+            | future(),
+        %% the retry context of the container
+        retry_ctx := undefined | any()
+    }.
 
 -type future() :: pos_integer().
 -type limiter_id() :: emqx_limiter_schema:limiter_id().
@@ -78,7 +80,20 @@ get_limiter_by_types(Id, Types, BucketCfgs) ->
         {ok, Limiter} = emqx_limiter_server:connect(Id, Type, BucketCfgs),
         add_new(Type, Limiter, Acc)
     end,
-    lists:foldl(Init, #{retry_ctx => undefined}, Types).
+    Container = lists:foldl(Init, #{retry_ctx => undefined}, Types),
+    case
+        lists:all(
+            fun(Type) ->
+                maps:get(Type, Container) =:= infinity
+            end,
+            Types
+        )
+    of
+        true ->
+            infinity;
+        _ ->
+            Container
+    end.
 
 -spec add_new(limiter_type(), limiter(), container()) -> container().
 add_new(Type, Limiter, Container) ->
@@ -89,11 +104,15 @@ add_new(Type, Limiter, Container) ->
 
 %% @doc check the specified limiter
 -spec check(pos_integer(), limiter_type(), container()) -> check_result().
+check(_Need, _Type, infinity) ->
+    {ok, infinity};
 check(Need, Type, Container) ->
     check_list([{Need, Type}], Container).
 
 %% @doc check multiple limiters
 -spec check_list(list({pos_integer(), limiter_type()}), container()) -> check_result().
+check_list(_Need, infinity) ->
+    {ok, infinity};
 check_list([{Need, Type} | T], Container) ->
     Limiter = maps:get(Type, Container),
     case emqx_htb_limiter:check(Need, Limiter) of
@@ -121,11 +140,15 @@ check_list([], Container) ->
 
 %% @doc retry the specified limiter
 -spec retry(limiter_type(), container()) -> check_result().
+retry(_Type, infinity) ->
+    {ok, infinity};
 retry(Type, Container) ->
     retry_list([Type], Container).
 
 %% @doc retry multiple limiters
 -spec retry_list(list(limiter_type()), container()) -> check_result().
+retry_list(_Types, infinity) ->
+    {ok, infinity};
 retry_list([Type | T], Container) ->
     Key = ?RETRY_KEY(Type),
     case Container of

+ 47 - 40
apps/emqx/src/emqx_ws_connection.erl

@@ -90,7 +90,7 @@
     listener :: {Type :: atom(), Name :: atom()},
 
     %% Limiter
-    limiter :: maybe(container()),
+    limiter :: container(),
 
     %% cache operation when overload
     limiter_cache :: queue:queue(cache()),
@@ -579,54 +579,61 @@ handle_timeout(TRef, TMsg, State) ->
     list(any()),
     state()
 ) -> state().
+check_limiter(
+    _Needs,
+    Data,
+    WhenOk,
+    Msgs,
+    #state{limiter = infinity} = State
+) ->
+    WhenOk(Data, Msgs, State);
 check_limiter(
     Needs,
     Data,
     WhenOk,
     Msgs,
-    #state{
-        limiter = Limiter,
-        limiter_timer = LimiterTimer,
-        limiter_cache = Cache
-    } = State
+    #state{limiter_timer = undefined, limiter = Limiter} = State
 ) ->
-    case LimiterTimer of
-        undefined ->
-            case emqx_limiter_container:check_list(Needs, Limiter) of
-                {ok, Limiter2} ->
-                    WhenOk(Data, Msgs, State#state{limiter = Limiter2});
-                {pause, Time, Limiter2} ->
-                    ?SLOG(debug, #{
-                        msg => "pause_time_due_to_rate_limit",
-                        needs => Needs,
-                        time_in_ms => Time
-                    }),
-
-                    Retry = #retry{
-                        types = [Type || {_, Type} <- Needs],
-                        data = Data,
-                        next = WhenOk
-                    },
+    case emqx_limiter_container:check_list(Needs, Limiter) of
+        {ok, Limiter2} ->
+            WhenOk(Data, Msgs, State#state{limiter = Limiter2});
+        {pause, Time, Limiter2} ->
+            ?SLOG(debug, #{
+                msg => "pause_time_due_to_rate_limit",
+                needs => Needs,
+                time_in_ms => Time
+            }),
+
+            Retry = #retry{
+                types = [Type || {_, Type} <- Needs],
+                data = Data,
+                next = WhenOk
+            },
 
-                    Limiter3 = emqx_limiter_container:set_retry_context(Retry, Limiter2),
+            Limiter3 = emqx_limiter_container:set_retry_context(Retry, Limiter2),
 
-                    TRef = start_timer(Time, limit_timeout),
+            TRef = start_timer(Time, limit_timeout),
 
-                    enqueue(
-                        {active, false},
-                        State#state{
-                            sockstate = blocked,
-                            limiter = Limiter3,
-                            limiter_timer = TRef
-                        }
-                    );
-                {drop, Limiter2} ->
-                    {ok, State#state{limiter = Limiter2}}
-            end;
-        _ ->
-            New = #cache{need = Needs, data = Data, next = WhenOk},
-            State#state{limiter_cache = queue:in(New, Cache)}
-    end.
+            enqueue(
+                {active, false},
+                State#state{
+                    sockstate = blocked,
+                    limiter = Limiter3,
+                    limiter_timer = TRef
+                }
+            );
+        {drop, Limiter2} ->
+            {ok, State#state{limiter = Limiter2}}
+    end;
+check_limiter(
+    Needs,
+    Data,
+    WhenOk,
+    _Msgs,
+    #state{limiter_cache = Cache} = State
+) ->
+    New = #cache{need = Needs, data = Data, next = WhenOk},
+    State#state{limiter_cache = queue:in(New, Cache)}.
 
 -spec retry_limiter(state()) -> state().
 retry_limiter(#state{limiter = Limiter} = State) ->

+ 19 - 11
apps/emqx/test/emqx_connection_SUITE.erl

@@ -38,8 +38,6 @@ init_per_suite(Config) ->
     ok = meck:new(emqx_cm, [passthrough, no_history, no_link]),
     ok = meck:expect(emqx_cm, mark_channel_connected, fun(_) -> ok end),
     ok = meck:expect(emqx_cm, mark_channel_disconnected, fun(_) -> ok end),
-    %% Meck Limiter
-    ok = meck:new(emqx_htb_limiter, [passthrough, no_history, no_link]),
     %% Meck Pd
     ok = meck:new(emqx_pd, [passthrough, no_history, no_link]),
     %% Meck Metrics
@@ -67,7 +65,6 @@ end_per_suite(_Config) ->
     ok = meck:unload(emqx_transport),
     catch meck:unload(emqx_channel),
     ok = meck:unload(emqx_cm),
-    ok = meck:unload(emqx_htb_limiter),
     ok = meck:unload(emqx_pd),
     ok = meck:unload(emqx_metrics),
     ok = meck:unload(emqx_hooks),
@@ -421,6 +418,14 @@ t_ensure_rate_limit(_) ->
     {ok, [], State1} = emqx_connection:check_limiter([], [], WhenOk, [], st(#{limiter => Limiter})),
     ?assertEqual(Limiter, emqx_connection:info(limiter, State1)),
 
+    ok = meck:new(emqx_htb_limiter, [passthrough, no_history, no_link]),
+
+    ok = meck:expect(
+        emqx_htb_limiter,
+        make_infinity_limiter,
+        fun() -> non_infinity end
+    ),
+
     ok = meck:expect(
         emqx_htb_limiter,
         check,
@@ -431,10 +436,10 @@ t_ensure_rate_limit(_) ->
         [],
         WhenOk,
         [],
-        st(#{limiter => Limiter})
+        st(#{limiter => init_limiter()})
     ),
     meck:unload(emqx_htb_limiter),
-    ok = meck:new(emqx_htb_limiter, [passthrough, no_history, no_link]),
+
     ?assertNotEqual(undefined, emqx_connection:info(limiter_timer, State2)).
 
 t_activate_socket(_) ->
@@ -707,7 +712,14 @@ init_limiter() ->
 
 limiter_cfg() ->
     Cfg = bucket_cfg(),
-    Client = #{
+    Client = client_cfg(),
+    #{bytes => Cfg, messages => Cfg, client => #{bytes => Client, messages => Client}}.
+
+bucket_cfg() ->
+    #{rate => infinity, initial => 0, burst => 0}.
+
+client_cfg() ->
+    #{
         rate => infinity,
         initial => 0,
         burst => 0,
@@ -715,11 +727,7 @@ limiter_cfg() ->
         divisible => false,
         max_retry_time => timer:seconds(5),
         failure_strategy => force
-    },
-    #{bytes => Cfg, messages => Cfg, client => #{bytes => Client, messages => Client}}.
-
-bucket_cfg() ->
-    #{rate => infinity, initial => 0, burst => 0}.
+    }.
 
 add_bucket() ->
     Cfg = bucket_cfg(),

+ 6 - 1
apps/emqx/test/emqx_ws_connection_SUITE.erl

@@ -443,7 +443,12 @@ t_websocket_info_deliver(_) ->
 
 t_websocket_info_timeout_limiter(_) ->
     Ref = make_ref(),
-    LimiterT = init_limiter(),
+    {ok, Rate} = emqx_limiter_schema:to_rate("50MB"),
+    LimiterT = init_limiter(#{
+        bytes => bucket_cfg(),
+        messages => bucket_cfg(),
+        client => #{bytes => client_cfg(Rate)}
+    }),
     Next = fun emqx_ws_connection:when_msg_in/3,
     Limiter = emqx_limiter_container:set_retry_context({retry, [], [], Next}, LimiterT),
     Event = {timeout, Ref, limit_timeout},