Browse Source

feature(ratelimit): support to set ratelimit & quota policy

JianBo He 5 năm trước cách đây
mục cha
commit
90b33b044d

+ 7 - 2
src/emqx_channel.erl

@@ -106,7 +106,7 @@
           await_timer  => expire_awaiting_rel,
           expire_timer => expire_session,
           will_timer   => will_message,
-          quota_timer  => reset_quota_flag
+          quota_timer  => expire_quota_limit
          }).
 
 -define(INFO_KEYS, [conninfo, conn_state, clientinfo, session, will_msg]).
@@ -855,6 +855,11 @@ handle_call({takeover, 'end'}, Channel = #channel{session  = Session,
 handle_call(list_acl_cache, Channel) ->
     {reply, emqx_acl_cache:list_acl_cache(), Channel};
 
+handle_call({quota, Policy}, Channel) ->
+    Zone = info(zone, Channel),
+    Quota = emqx_limiter:init(Zone, Policy),
+    reply(ok, Channel#channel{quota = Quota});
+
 handle_call(Req, Channel) ->
     ?LOG(error, "Unexpected call: ~p", [Req]),
     reply(ignored, Channel).
@@ -962,7 +967,7 @@ handle_timeout(_TRef, will_message, Channel = #channel{will_msg = WillMsg}) ->
     (WillMsg =/= undefined) andalso publish_will_msg(WillMsg),
     {ok, clean_timer(will_timer, Channel#channel{will_msg = undefined})};
 
-handle_timeout(_TRef, reset_quota_flag, Channel) ->
+handle_timeout(_TRef, expire_quota_limit, Channel) ->
     {ok, clean_timer(quota_timer, Channel)};
 
 handle_timeout(_TRef, Msg, Channel) ->

+ 5 - 0
src/emqx_connection.erl

@@ -455,6 +455,11 @@ handle_call(_From, info, State) ->
 handle_call(_From, stats, State) ->
     {reply, stats(State), State};
 
+handle_call(_From, {ratelimit, Policy}, State = #state{channel = Channel}) ->
+    Zone = emqx_channel:info(zone, Channel),
+    Limiter = emqx_limiter:init(Zone, Policy),
+    {reply, ok, State#state{limiter = Limiter}};
+
 handle_call(_From, Req, State = #state{channel = Channel}) ->
     case emqx_channel:handle_call(Req, Channel) of
         {reply, Reply, NChannel} ->

+ 3 - 5
src/emqx_limiter.erl

@@ -44,9 +44,7 @@
               | overall_messages_routing
               ).
 
--type(spec() :: {name(), esockd_rate_limit:config()}).
-
--type(specs() :: [spec()]).
+-type(policy() :: [{name(), esockd_rate_limit:config()}]).
 
 -type(info() :: #{name() :=
                   #{tokens   := non_neg_integer(),
@@ -61,7 +59,7 @@
 
 -spec(init(emqx_zone:zone(),
            maybe(esockd_rate_limit:config()),
-           maybe(esockd_rate_limit:config()), specs())
+           maybe(esockd_rate_limit:config()), policy())
      -> maybe(limiter())).
 init(Zone, PubLimit, BytesIn, Specs) ->
     Merged = maps:merge(#{conn_messages_in => PubLimit,
@@ -69,7 +67,7 @@ init(Zone, PubLimit, BytesIn, Specs) ->
     Filtered = maps:filter(fun(_, V) -> V /= undefined end, Merged),
     init(Zone, maps:to_list(Filtered)).
 
--spec(init(emqx_zone:zone(), specs()) -> maybe(limiter())).
+-spec(init(emqx_zone:zone(), policy()) -> maybe(limiter())).
 init(_Zone, []) ->
     undefined;
 init(Zone, Specs) ->

+ 5 - 0
src/emqx_ws_connection.erl

@@ -354,6 +354,11 @@ handle_call(From, stats, State) ->
     gen_server:reply(From, stats(State)),
     return(State);
 
+handle_call(_From, {ratelimit, Policy}, State = #state{channel = Channel}) ->
+    Zone = emqx_channel:info(zone, Channel),
+    Limiter = emqx_limiter:init(Zone, Policy),
+    {reply, ok, State#state{limiter = Limiter}};
+
 handle_call(From, Req, State = #state{channel = Channel}) ->
     case emqx_channel:handle_call(Req, Channel) of
         {reply, Reply, NChannel} ->

+ 6 - 3
test/emqx_channel_SUITE.erl

@@ -367,7 +367,7 @@ t_quota_qos0(_) ->
     {ok, Chann1} = emqx_channel:handle_in(Pub, Chann),
     {ok, Chann2} = emqx_channel:handle_in(Pub, Chann1),
     M1 = emqx_metrics:val('packets.publish.dropped') - 1,
-    {ok, Chann3} = emqx_channel:handle_timeout(ref, reset_quota_flag, Chann2),
+    {ok, Chann3} = emqx_channel:handle_timeout(ref, expire_quota_limit, Chann2),
     {ok, _} = emqx_channel:handle_in(Pub, Chann3),
     M1 = emqx_metrics:val('packets.publish.dropped') - 1,
 
@@ -383,7 +383,7 @@ t_quota_qos1(_) ->
     %% Quota per connections
     {ok, ?PUBACK_PACKET(1, ?RC_SUCCESS), Chann1} = emqx_channel:handle_in(Pub, Chann),
     {ok, ?PUBACK_PACKET(1, ?RC_QUOTA_EXCEEDED), Chann2} = emqx_channel:handle_in(Pub, Chann1),
-    {ok, Chann3} = emqx_channel:handle_timeout(ref, reset_quota_flag, Chann2),
+    {ok, Chann3} = emqx_channel:handle_timeout(ref, expire_quota_limit, Chann2),
     {ok, ?PUBACK_PACKET(1, ?RC_SUCCESS), Chann4} = emqx_channel:handle_in(Pub, Chann3),
     %% Quota in overall
     {ok, ?PUBACK_PACKET(1, ?RC_QUOTA_EXCEEDED), _} = emqx_channel:handle_in(Pub, Chann4),
@@ -400,7 +400,7 @@ t_quota_qos2(_) ->
     %% Quota per connections
     {ok, ?PUBREC_PACKET(1, ?RC_SUCCESS), Chann1} = emqx_channel:handle_in(Pub1, Chann),
     {ok, ?PUBREC_PACKET(2, ?RC_QUOTA_EXCEEDED), Chann2} = emqx_channel:handle_in(Pub2, Chann1),
-    {ok, Chann3} = emqx_channel:handle_timeout(ref, reset_quota_flag, Chann2),
+    {ok, Chann3} = emqx_channel:handle_timeout(ref, expire_quota_limit, Chann2),
     {ok, ?PUBREC_PACKET(3, ?RC_SUCCESS), Chann4} = emqx_channel:handle_in(Pub3, Chann3),
     %% Quota in overall
     {ok, ?PUBREC_PACKET(4, ?RC_QUOTA_EXCEEDED), _} = emqx_channel:handle_in(Pub4, Chann4),
@@ -528,6 +528,9 @@ t_handle_call_takeover_end(_) ->
     {shutdown, takeovered, [], _, _Chan} =
         emqx_channel:handle_call({takeover, 'end'}, channel()).
 
+t_handle_call_quota(_) ->
+    {reply, ok, _Chan} = emqx_channel:handle_call({quota, [{conn_messages_routing, {100,1}}]}, channel()).
+
 t_handle_call_unexpected(_) ->
     {reply, ignored, _Chan} = emqx_channel:handle_call(unexpected_req, channel()).
 

+ 3 - 1
test/emqx_connection_SUITE.erl

@@ -220,7 +220,9 @@ t_handle_call(_) ->
     St = st(),
     ?assertMatch({ok, _St}, emqx_connection:handle_msg({event, undefined}, St)),
     ?assertMatch({reply, _Info, _NSt}, emqx_connection:handle_call(self(), info, St)),
-    ?assertMatch({reply, _Stats, _NSt }, emqx_connection:handle_call(self(), stats, St)),
+    ?assertMatch({reply, _Stats, _NSt}, emqx_connection:handle_call(self(), stats, St)),
+    ?assertMatch({reply, ok, _NSt}, emqx_connection:handle_call(self(), {ratelimit, []}, St)),
+    ?assertMatch({reply, ok, _NSt}, emqx_connection:handle_call(self(), {ratelimit, [{conn_messages_in, {100, 1}}]}, St)),
     ?assertEqual({reply, ignored, St}, emqx_connection:handle_call(self(), for_testing, St)),
     ?assertMatch({stop, {shutdown,kicked}, ok, _NSt}, emqx_connection:handle_call(self(), kick, St)).