|
|
@@ -1013,17 +1013,23 @@ check_limiter(
|
|
|
Data,
|
|
|
WhenOk,
|
|
|
Msgs,
|
|
|
- #state{limiter_timer = undefined, limiter = Limiter} = State
|
|
|
+ #state{channel = Channel, limiter_timer = undefined, limiter = Limiter} = State
|
|
|
) ->
|
|
|
case emqx_limiter_container:check_list(Needs, Limiter) of
|
|
|
{ok, Limiter2} ->
|
|
|
WhenOk(Data, Msgs, State#state{limiter = Limiter2});
|
|
|
{pause, Time, Limiter2} ->
|
|
|
- ?SLOG(debug, #{
|
|
|
- msg => "pause_time_due_to_rate_limit",
|
|
|
- needs => Needs,
|
|
|
- time_in_ms => Time
|
|
|
- }),
|
|
|
+ ?SLOG_THROTTLE(
|
|
|
+ warning,
|
|
|
+ #{
|
|
|
+ msg => socket_receive_paused_by_rate_limit,
|
|
|
+ paused_ms => Time
|
|
|
+ },
|
|
|
+ #{
|
|
|
+ tag => "RATE",
|
|
|
+ clientid => emqx_channel:info(clientid, Channel)
|
|
|
+ }
|
|
|
+ ),
|
|
|
|
|
|
Retry = #retry{
|
|
|
types = [Type || {_, Type} <- Needs],
|
|
|
@@ -1057,7 +1063,7 @@ check_limiter(
|
|
|
|
|
|
%% try to perform a retry
|
|
|
-spec retry_limiter(state()) -> _.
|
|
|
-retry_limiter(#state{limiter = Limiter} = State) ->
|
|
|
+retry_limiter(#state{channel = Channel, limiter = Limiter} = State) ->
|
|
|
#retry{types = Types, data = Data, next = Next} =
|
|
|
emqx_limiter_container:get_retry_context(Limiter),
|
|
|
case emqx_limiter_container:retry_list(Types, Limiter) of
|
|
|
@@ -1071,11 +1077,17 @@ retry_limiter(#state{limiter = Limiter} = State) ->
|
|
|
}
|
|
|
);
|
|
|
{pause, Time, Limiter2} ->
|
|
|
- ?SLOG(debug, #{
|
|
|
- msg => "pause_time_due_to_rate_limit",
|
|
|
- types => Types,
|
|
|
- time_in_ms => Time
|
|
|
- }),
|
|
|
+ ?SLOG_THROTTLE(
|
|
|
+ warning,
|
|
|
+ #{
|
|
|
+ msg => socket_receive_paused_by_rate_limit,
|
|
|
+ paused_ms => Time
|
|
|
+ },
|
|
|
+ #{
|
|
|
+ tag => "RATE",
|
|
|
+ clientid => emqx_channel:info(clientid, Channel)
|
|
|
+ }
|
|
|
+ ),
|
|
|
|
|
|
TRef = start_timer(Time, limit_timeout),
|
|
|
|