|
|
@@ -249,12 +249,13 @@ handle_info({tcp, _Sock, Data}, State) ->
|
|
|
?LOG(debug, "RECV ~p", [Data]),
|
|
|
Size = iolist_size(Data),
|
|
|
emqx_metrics:trans(inc, 'bytes/received', Size),
|
|
|
+ emqx_pd:update_counter(incoming_bytes, Size),
|
|
|
Incoming = #{bytes => Size, packets => 0},
|
|
|
handle_packet(Data, State#state{incoming = Incoming});
|
|
|
|
|
|
%% Rate limit here, cool:)
|
|
|
handle_info({tcp_passive, _Sock}, State) ->
|
|
|
- {noreply, ensure_rate_limit(State)};
|
|
|
+ {noreply, run_socket(ensure_rate_limit(State))};
|
|
|
|
|
|
handle_info({tcp_error, _Sock, Reason}, State) ->
|
|
|
shutdown(Reason, State);
|
|
|
@@ -336,10 +337,10 @@ handle_packet(Data, State = #state{proto_state = ProtoState,
|
|
|
{noreply, State#state{parser_state = ParserState1}, IdleTimeout};
|
|
|
{ok, Packet = ?PACKET(Type), Rest} ->
|
|
|
emqx_metrics:received(Packet),
|
|
|
+ (Type == ?PUBLISH) andalso emqx_pd:update_counter(incoming_pubs, 1),
|
|
|
case emqx_protocol:received(Packet, ProtoState) of
|
|
|
{ok, ProtoState1} ->
|
|
|
- NewState = State#state{proto_state = ProtoState1},
|
|
|
- handle_packet(Rest, inc_publish_cnt(Type, reset_parser(NewState)));
|
|
|
+ handle_packet(Rest, reset_parser(State#state{proto_state = ProtoState1}));
|
|
|
{error, Reason} ->
|
|
|
?LOG(error, "Process packet error - ~p", [Reason]),
|
|
|
shutdown(Reason, State);
|
|
|
@@ -360,28 +361,21 @@ handle_packet(Data, State = #state{proto_state = ProtoState,
|
|
|
reset_parser(State = #state{proto_state = ProtoState}) ->
|
|
|
State#state{parser_state = emqx_protocol:parser(ProtoState)}.
|
|
|
|
|
|
-inc_publish_cnt(Type, State = #state{incoming = Incoming = #{packets := Cnt}})
|
|
|
- when Type == ?PUBLISH; Type == ?SUBSCRIBE ->
|
|
|
- State#state{incoming = Incoming#{packets := Cnt + 1}};
|
|
|
-
|
|
|
-inc_publish_cnt(_Type, State) ->
|
|
|
- State.
|
|
|
-
|
|
|
%%------------------------------------------------------------------------------
|
|
|
%% Ensure rate limit
|
|
|
%%------------------------------------------------------------------------------
|
|
|
|
|
|
-ensure_rate_limit(State = #state{rate_limit = Rl, pub_limit = Pl,
|
|
|
- incoming = #{packets := Packets, bytes := Bytes}}) ->
|
|
|
- ensure_rate_limit([{Pl, #state.pub_limit, Packets},
|
|
|
- {Rl, #state.rate_limit, Bytes}], State).
|
|
|
+ensure_rate_limit(State = #state{rate_limit = Rl, pub_limit = Pl}) ->
|
|
|
+ Limiters = [{Pl, #state.pub_limit, emqx_pd:reset_counter(incoming_pubs)},
|
|
|
+ {Rl, #state.rate_limit, emqx_pd:reset_counter(incoming_bytes)}],
|
|
|
+ ensure_rate_limit(Limiters, State).
|
|
|
|
|
|
ensure_rate_limit([], State) ->
|
|
|
- run_socket(State);
|
|
|
-ensure_rate_limit([{undefined, _Pos, _Num}|Limiters], State) ->
|
|
|
+ State;
|
|
|
+ensure_rate_limit([{undefined, _Pos, _Cnt}|Limiters], State) ->
|
|
|
ensure_rate_limit(Limiters, State);
|
|
|
-ensure_rate_limit([{Rl, Pos, Num}|Limiters], State) ->
|
|
|
- case esockd_rate_limit:check(Num, Rl) of
|
|
|
+ensure_rate_limit([{Rl, Pos, Cnt}|Limiters], State) ->
|
|
|
+ case esockd_rate_limit:check(Cnt, Rl) of
|
|
|
{0, Rl1} ->
|
|
|
ensure_rate_limit(Limiters, setelement(Pos, State, Rl1));
|
|
|
{Pause, Rl1} ->
|
|
|
@@ -423,3 +417,4 @@ maybe_gc(#state{}, {publish, _PacketId, #message{payload = Payload}}) ->
|
|
|
maybe_gc(_, _) ->
|
|
|
ok.
|
|
|
|
|
|
+
|