Forráskód Böngészése

Merge pull request #10668 from lafirest/fix/max_conn_rate

fix(limiter): fix an error when setting `max_conn_rate` in a listener
lafirest 2 éve
szülő
commit
d3a7d6d9d8

+ 2 - 1
apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl

@@ -286,7 +286,8 @@ default_client_config() ->
 default_bucket_config() ->
     #{
         rate => infinity,
-        burst => 0
+        burst => 0,
+        initial => 0
     }.
 
 get_listener_opts(Conf) ->

+ 28 - 21
apps/emqx/src/emqx_listeners.erl

@@ -347,7 +347,8 @@ do_start_listener(Type, ListenerName, #{bind := ListenOn} = Opts) when
     Type == tcp; Type == ssl
 ->
     Id = listener_id(Type, ListenerName),
-    add_limiter_bucket(Id, Opts),
+    Limiter = limiter(Opts),
+    add_limiter_bucket(Id, Limiter),
     esockd:open(
         Id,
         ListenOn,
@@ -356,7 +357,7 @@ do_start_listener(Type, ListenerName, #{bind := ListenOn} = Opts) when
             #{
                 listener => {Type, ListenerName},
                 zone => zone(Opts),
-                limiter => limiter(Opts),
+                limiter => Limiter,
                 enable_authn => enable_authn(Opts)
             }
         ]}
@@ -366,9 +367,10 @@ do_start_listener(Type, ListenerName, #{bind := ListenOn} = Opts) when
     Type == ws; Type == wss
 ->
     Id = listener_id(Type, ListenerName),
-    add_limiter_bucket(Id, Opts),
+    Limiter = limiter(Opts),
+    add_limiter_bucket(Id, Limiter),
     RanchOpts = ranch_opts(Type, ListenOn, Opts),
-    WsOpts = ws_opts(Type, ListenerName, Opts),
+    WsOpts = ws_opts(Type, ListenerName, Opts, Limiter),
     case Type of
         ws -> cowboy:start_clear(Id, RanchOpts, WsOpts);
         wss -> cowboy:start_tls(Id, RanchOpts, WsOpts)
@@ -415,20 +417,22 @@ do_start_listener(quic, ListenerName, #{bind := Bind} = Opts) ->
                         Password -> [{password, str(Password)}]
                     end ++
                     optional_quic_listener_opts(Opts),
+            Limiter = limiter(Opts),
             ConnectionOpts = #{
                 conn_callback => emqx_quic_connection,
                 peer_unidi_stream_count => maps:get(peer_unidi_stream_count, Opts, 1),
                 peer_bidi_stream_count => maps:get(peer_bidi_stream_count, Opts, 10),
                 zone => zone(Opts),
                 listener => {quic, ListenerName},
-                limiter => limiter(Opts)
+                limiter => Limiter
             },
             StreamOpts = #{
                 stream_callback => emqx_quic_stream,
                 active => 1
             },
+
             Id = listener_id(quic, ListenerName),
-            add_limiter_bucket(Id, Opts),
+            add_limiter_bucket(Id, Limiter),
             quicer:start_listener(
                 Id,
                 ListenOn,
@@ -532,12 +536,12 @@ esockd_opts(ListenerId, Type, Opts0) ->
         end
     ).
 
-ws_opts(Type, ListenerName, Opts) ->
+ws_opts(Type, ListenerName, Opts, Limiter) ->
     WsPaths = [
         {emqx_utils_maps:deep_get([websocket, mqtt_path], Opts, "/mqtt"), emqx_ws_connection, #{
             zone => zone(Opts),
             listener => {Type, ListenerName},
-            limiter => limiter(Opts),
+            limiter => Limiter,
             enable_authn => enable_authn(Opts)
         }}
     ],
@@ -653,26 +657,29 @@ zone(Opts) ->
 limiter(Opts) ->
     emqx_limiter_schema:get_listener_opts(Opts).
 
-add_limiter_bucket(Id, #{limiter := Limiter}) ->
+add_limiter_bucket(_Id, undefined) ->
+    ok;
+add_limiter_bucket(Id, Limiter) ->
     maps:fold(
         fun(Type, Cfg, _) ->
             emqx_limiter_server:add_bucket(Id, Type, Cfg)
         end,
         ok,
         maps:without([client], Limiter)
-    );
-add_limiter_bucket(_Id, _Cfg) ->
-    ok.
+    ).
 
-del_limiter_bucket(Id, #{limiter := Limiters}) ->
-    lists:foreach(
-        fun(Type) ->
-            emqx_limiter_server:del_bucket(Id, Type)
-        end,
-        maps:keys(Limiters)
-    );
-del_limiter_bucket(_Id, _Cfg) ->
-    ok.
+del_limiter_bucket(Id, Conf) ->
+    case limiter(Conf) of
+        undefined ->
+            ok;
+        Limiter ->
+            lists:foreach(
+                fun(Type) ->
+                    emqx_limiter_server:del_bucket(Id, Type)
+                end,
+                maps:keys(Limiter)
+            )
+    end.
 
 enable_authn(Opts) ->
     maps:get(enable_authn, Opts, true).