Feng 10 лет назад
Родитель
Сommit
47710c36aa
1 измененных файлов с 75 добавлено и 51 удалено
  1. 75 51
      src/emqttd_client.erl

+ 75 - 51
src/emqttd_client.erl

@@ -20,7 +20,7 @@
 %%% SOFTWARE.
 %%%-----------------------------------------------------------------------------
 %%% @doc
-%%% MQTT Client
+%%% MQTT Client Connection.
 %%%
 %%% @end
 %%%-----------------------------------------------------------------------------
@@ -52,7 +52,7 @@
                 conn_name,
                 await_recv,
                 conn_state,
-                conserve,
+                rate_limiter,
                 parser,
                 proto_state,
                 packet_opts,
@@ -85,22 +85,26 @@ unsubscribe(CPid, Topics) ->
 
 init([SockArgs = {Transport, Sock, _SockFun}, MqttEnv]) ->
     % Transform if ssl.
-    {ok, NewSock} = esockd_connection:accept(SockArgs),
+    {ok, NewSock}  = esockd_connection:accept(SockArgs),
+    %%TODO:...
+    {ok, BufSizes} = inet:getopts(Sock, [sndbuf, recbuf, buffer]),
+    io:format("~p~n", [BufSizes]),
     {ok, Peername} = emqttd_net:peername(Sock),
-    {ok, ConnStr} = emqttd_net:connection_string(Sock, inbound),
-    SendFun = fun(Data) -> Transport:send(NewSock, Data) end,
-    PktOpts = proplists:get_value(packet, MqttEnv),
+    {ok, ConnStr}  = emqttd_net:connection_string(Sock, inbound),
+    SendFun    = send_fun(Transport, NewSock),
+    PktOpts    = proplists:get_value(packet, MqttEnv),
     ProtoState = emqttd_protocol:init(Peername, SendFun, PktOpts),
-    State = control_throttle(#state{transport    = Transport,
-                                    socket       = NewSock,
-                                    peername     = Peername,
-                                    conn_name    = ConnStr,
-                                    await_recv   = false,
-                                    conn_state   = running,
-                                    conserve     = false,
-                                    packet_opts  = PktOpts,
-                                    parser       = emqttd_parser:new(PktOpts),
-                                    proto_state  = ProtoState}),
+    Limiter    = proplists:get_value(rate_limiter, MqttEnv),
+    State      = run_socket(#state{transport    = Transport,
+                                   socket       = NewSock,
+                                   peername     = Peername,
+                                   conn_name    = ConnStr,
+                                   await_recv   = false,
+                                   conn_state   = running,
+                                   rate_limiter = Limiter,
+                                   packet_opts  = PktOpts,
+                                   parser       = emqttd_parser:new(PktOpts),
+                                   proto_state  = ProtoState}),
     ClientOpts = proplists:get_value(client, MqttEnv),
     IdleTimout = proplists:get_value(idle_timeout, ClientOpts, 10),
     gen_server:enter_loop(?MODULE, [], State, timer:seconds(IdleTimout)).
@@ -146,20 +150,26 @@ handle_info({redeliver, {?PUBREL, PacketId}},  State = #state{proto_state = Prot
     {ok, ProtoState1} = emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState),
     noreply(State#state{proto_state = ProtoState1});
 
-handle_info({inet_reply, _Ref, ok}, State) ->
-    noreply(State);
+handle_info(activate_sock, State) ->
+    noreply(run_socket(State#state{conn_state = running}));
 
 handle_info({inet_async, Sock, _Ref, {ok, Data}}, State = #state{peername = Peername, socket = Sock}) ->
+    Size = size(Data),
     lager:debug("RECV from ~s: ~p", [emqttd_net:format(Peername), Data]),
-    emqttd_metrics:inc('bytes/received', size(Data)),
-    received(Data, control_throttle(State #state{await_recv = false}));
+    emqttd_metrics:inc('bytes/received', Size),
+    received(Data, rate_limit(Size, State#state{await_recv = false}));
 
 handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) ->
+    %%TODO: ...
     network_error(Reason, State);
 
+handle_info({inet_reply, _Ref, ok}, State) ->
+    %%TODO: ok...
+    io:format("inet_reply ok~n"),
+    noreply(State);
+
 handle_info({inet_reply, _Sock, {error, Reason}}, State) ->
-    ?ERROR("Unexpected inet_reply - ~p", [Reason], State),
-    {noreply, State};
+    network_error(Reason, State);
 
 handle_info({keepalive, start, TimeoutSec}, State = #state{transport = Transport, socket = Socket}) ->
     ?DEBUG("Start KeepAlive with ~p seconds", [TimeoutSec], State),
@@ -174,14 +184,14 @@ handle_info({keepalive, start, TimeoutSec}, State = #state{transport = Transport
 
 handle_info({keepalive, check}, State = #state{keepalive = KeepAlive}) ->
     case emqttd_keepalive:check(KeepAlive) of
-    {ok, KeepAlive1} ->
-        noreply(State#state{keepalive = KeepAlive1});
-    {error, timeout} ->
-        ?DEBUG("Keepalive Timeout!", [], State),
-        stop({shutdown, keepalive_timeout}, State#state{keepalive = undefined});
-    {error, Error} ->
-        ?DEBUG("Keepalive Error - ~p", [Error], State),
-        stop({shutdown, keepalive_error}, State#state{keepalive = undefined})
+        {ok, KeepAlive1} ->
+            noreply(State#state{keepalive = KeepAlive1});
+        {error, timeout} ->
+            ?DEBUG("Keepalive Timeout!", [], State),
+            stop({shutdown, keepalive_timeout}, State#state{keepalive = undefined});
+        {error, Error} ->
+            ?DEBUG("Keepalive Error - ~p", [Error], State),
+            stop({shutdown, keepalive_error}, State#state{keepalive = undefined})
     end;
 
 handle_info(Info, State) ->
@@ -223,27 +233,27 @@ with_session(Fun, State = #state{proto_state = ProtoState}) ->
 
 %% receive and parse tcp data
 received(<<>>, State) ->
-    {noreply, State, hibernate};
+    noreply(State);
 
-received(Bytes, State = #state{packet_opts = PacketOpts,
-                               parser      = Parser,
+received(Bytes, State = #state{parser      = Parser,
+                               packet_opts = PacketOpts,
                                proto_state = ProtoState}) ->
     case catch Parser(Bytes) of
         {more, NewParser} ->
-            noreply(control_throttle(State#state{parser = NewParser}));
+            noreply(run_socket(State#state{parser = NewParser}));
         {ok, Packet, Rest} ->
             emqttd_metrics:received(Packet),
             case emqttd_protocol:received(Packet, ProtoState) of
-            {ok, ProtoState1} ->
-                received(Rest, State#state{parser = emqttd_parser:new(PacketOpts),
-                                           proto_state = ProtoState1});
-            {error, Error} ->
-                ?ERROR("Protocol error - ~p", [Error], State),
-                stop({shutdown, Error}, State);
-            {error, Error, ProtoState1} ->
-                stop({shutdown, Error}, State#state{proto_state = ProtoState1});
-            {stop, Reason, ProtoState1} ->
-                stop(Reason, State#state{proto_state = ProtoState1})
+                {ok, ProtoState1} ->
+                    received(Rest, State#state{parser = emqttd_parser:new(PacketOpts),
+                                               proto_state = ProtoState1});
+                {error, Error} ->
+                    ?ERROR("Protocol error - ~p", [Error], State),
+                    stop({shutdown, Error}, State);
+                {error, Error, ProtoState1} ->
+                    stop({shutdown, Error}, State#state{proto_state = ProtoState1});
+                {stop, Reason, ProtoState1} ->
+                    stop(Reason, State#state{proto_state = ProtoState1})
             end;
         {error, Error} ->
             ?ERROR("Framing error - ~p", [Error], State),
@@ -258,6 +268,20 @@ network_error(Reason, State = #state{peername = Peername}) ->
                       [emqttd_net:format(Peername), Reason]),
     stop({shutdown, conn_closed}, State).
 
+rate_limit(_Size, State = #state{rate_limiter = undefined}) ->
+    run_socket(State);
+rate_limit(Size, State = #state{socket = Sock, rate_limiter = Limiter}) ->
+    {ok, BufSizes} = inet:getopts(Sock, [sndbuf, recbuf, buffer]),
+    io:format("~p~n", [BufSizes]),
+    case esockd_rate_limiter:check(Limiter, Size) of
+        {0, Limiter1} ->
+            run_socket(State#state{conn_state = running, rate_limiter = Limiter1});
+        {Pause, Limiter1} ->
+            ?ERROR("~p Received, Rate Limiter Pause for ~w", [Size, Pause], State),
+            erlang:send_after(Pause, self(), activate_sock),
+            State#state{conn_state = blocked, rate_limiter = Limiter1}    
+    end.
+
 run_socket(State = #state{conn_state = blocked}) ->
     State;
 run_socket(State = #state{await_recv = true}) ->
@@ -266,11 +290,11 @@ run_socket(State = #state{transport = Transport, socket = Sock}) ->
     Transport:async_recv(Sock, 0, infinity),
     State#state{await_recv = true}.
 
-control_throttle(State = #state{conn_state = Flow,
-                                conserve   = Conserve}) ->
-    case {Flow, Conserve} of
-        {running,   true} -> State #state{conn_state = blocked};
-        {blocked,  false} -> run_socket(State #state{conn_state = running});
-        {_,            _} -> run_socket(State)
+send_fun(Transport, Sock) ->
+    fun(Data) ->
+        try Transport:port_command(Sock, Data) of
+            true -> ok
+        catch
+            error:Error -> exit({socket_error, Error})
+        end
     end.
-