Browse Source

Add '[{fullsweep_after, 10}]' opts and 'force_gc_count' to tune the memory usage

Feng Lee 9 years ago
parent
commit
7d65ad42ad
3 changed files with 50 additions and 30 deletions
  1. 18 14
      src/emqttd_client.erl
  2. 15 5
      src/emqttd_session.erl
  3. 17 11
      src/emqttd_ws_client.erl

+ 18 - 14
src/emqttd_client.erl

@@ -55,7 +55,7 @@
 %% Unused fields: connname, peerhost, peerport
 -record(client_state, {connection, peername, conn_state, await_recv,
                        rate_limit, packet_size, parser, proto_state,
-                       keepalive, enable_stats}).
+                       keepalive, enable_stats, force_gc_count}).
 
 -define(INFO_KEYS, [peername, conn_state, await_recv]).
 
@@ -66,7 +66,7 @@
                         [esockd_net:format(State#client_state.peername) | Args])).
 
 start_link(Conn, Env) ->
-    {ok, proc_lib:spawn_link(?MODULE, init, [[Conn, Env]])}.
+    {ok, proc_lib:spawn_link(?MODULE, init, [[Conn, Env]], [{fullsweep_after, 10}])}.
 
 info(CPid) ->
     gen_server2:call(CPid, info).
@@ -114,15 +114,17 @@ do_init(Conn, Env, Peername) ->
     Parser = emqttd_parser:initial_state(PacketSize),
     ProtoState = emqttd_protocol:init(Peername, SendFun, Env),
     EnableStats = get_value(client_enable_stats, Env, false),
-    State = run_socket(#client_state{connection   = Conn,
-                                     peername     = Peername,
-                                     await_recv   = false,
-                                     conn_state   = running,
-                                     rate_limit   = RateLimit,
-                                     packet_size  = PacketSize,
-                                     parser       = Parser,
-                                     proto_state  = ProtoState,
-                                     enable_stats = EnableStats}),
+    ForceGcCount = emqttd_gc:conn_max_gc_count(),
+    State = run_socket(#client_state{connection     = Conn,
+                                     peername       = Peername,
+                                     await_recv     = false,
+                                     conn_state     = running,
+                                     rate_limit     = RateLimit,
+                                     packet_size    = PacketSize,
+                                     parser         = Parser,
+                                     proto_state    = ProtoState,
+                                     enable_stats   = EnableStats,
+                                     force_gc_count = ForceGcCount}),
     IdleTimout = get_value(client_idle_timeout, Env, 30000),
     gen_server2:enter_loop(?MODULE, [], State, self(), IdleTimout,
                            {backoff, 1000, 1000, 10000}).
@@ -147,7 +149,7 @@ prioritise_info(Msg, _Len, _State) ->
     case Msg of {redeliver, _} -> 5; _ -> 0 end.
 
 handle_pre_hibernate(State) ->
-    {hibernate, emit_stats(State)}.
+    {hibernate, emit_stats(emqttd_gc:reset_conn_gc_count(State))}.
 
 handle_call(info, From, State = #client_state{proto_state = ProtoState}) ->
     ProtoInfo  = emqttd_protocol:info(ProtoState),
@@ -237,7 +239,7 @@ handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) ->
     shutdown(Reason, State);
 
 handle_info({inet_reply, _Sock, ok}, State) ->
-    {noreply, State, hibernate};
+    {noreply, gc(State), hibernate}; %% Tune GC
 
 handle_info({inet_reply, _Sock, {error, Reason}}, State) ->
     shutdown(Reason, State);
@@ -291,7 +293,7 @@ code_change(_OldVsn, State, _Extra) ->
 
 %% Receive and parse tcp data
 received(<<>>, State) ->
-    {noreply, State, hibernate};
+    {noreply, gc(State), hibernate};
 
 received(Bytes, State = #client_state{parser      = Parser,
                                       packet_size = PacketSize,
@@ -370,3 +372,5 @@ shutdown(Reason, State) ->
 stop(Reason, State) ->
     {stop, Reason, State}.
 
+gc(State) ->
+    emqttd_gc:maybe_force_gc(#client_state.force_gc_count, State).

+ 15 - 5
src/emqttd_session.erl

@@ -147,6 +147,9 @@
          %% Enable Stats
          enable_stats :: boolean(),
 
+         %% Force GC Count
+         force_gc_count :: undefined | integer(),
+
          created_at :: erlang:timestamp()
         }).
 
@@ -157,7 +160,8 @@
 -define(STATE_KEYS, [clean_sess, client_id, username, binding, client_pid, old_client_pid,
                      next_msg_id, max_subscriptions, subscriptions, upgrade_qos, inflight,
                      max_inflight, retry_interval, mqueue, awaiting_rel, max_awaiting_rel,
-                     await_rel_timeout, expiry_interval, enable_stats, created_at]).
+                     await_rel_timeout, expiry_interval, enable_stats, force_gc_count,
+                     created_at]).
 
 -define(LOG(Level, Format, Args, State),
             lager:Level([{client, State#state.client_id}],
@@ -166,7 +170,8 @@
 %% @doc Start a Session
 -spec(start_link(boolean(), {mqtt_client_id(), mqtt_username()}, pid()) -> {ok, pid()} | {error, any()}).
 start_link(CleanSess, {ClientId, Username}, ClientPid) ->
-    gen_server2:start_link(?MODULE, [CleanSess, {ClientId, Username}, ClientPid], []).
+    gen_server2:start_link(?MODULE, [CleanSess, {ClientId, Username}, ClientPid],
+                           [{fullsweep_after, 10}]). %% Tune GC.
 
 %%--------------------------------------------------------------------
 %% PubSub API
@@ -280,6 +285,7 @@ init([CleanSess, {ClientId, Username}, ClientPid]) ->
     {ok, QEnv} = emqttd:env(queue),
     MaxInflight = get_value(max_inflight, Env, 0),
     EnableStats = get_value(enable_stats, Env, false),
+    ForceGcCount = emqttd_gc:conn_max_gc_count(),
     MQueue = emqttd_mqueue:new(ClientId, QEnv, emqttd_alarm:alarm_fun()),
     State = #state{clean_sess        = CleanSess,
                    binding           = binding(ClientPid),
@@ -298,6 +304,7 @@ init([CleanSess, {ClientId, Username}, ClientPid]) ->
                    max_awaiting_rel  = get_value(max_awaiting_rel, Env),
                    expiry_interval   = get_value(expiry_interval, Env),
                    enable_stats      = EnableStats,
+                   force_gc_count    = ForceGcCount,
                    created_at        = os:timestamp()},
     emqttd_sm:register_session(ClientId, CleanSess, info(State)),
     emqttd_hooks:run('session.created', [ClientId, Username]),
@@ -334,7 +341,7 @@ prioritise_info(Msg, _Len, _State) ->
     end.
 
 handle_pre_hibernate(State) ->
-    {hibernate, emit_stats(State)}.
+    {hibernate, emit_stats(emqttd_gc:reset_conn_gc_count(State))}.
 
 handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, pktid = PacketId}}, _From,
             State = #state{awaiting_rel      = AwaitingRel,
@@ -443,7 +450,7 @@ handle_cast({pubrel, PacketId}, State = #state{awaiting_rel = AwaitingRel}) ->
      case maps:take(PacketId, AwaitingRel) of
          {Msg, AwaitingRel1} ->
              spawn(emqttd_server, publish, [Msg]), %%:)
-             State#state{awaiting_rel = AwaitingRel1};
+             gc(State#state{awaiting_rel = AwaitingRel1});
          error ->
              ?LOG(warning, "Cannot find PUBREL: ~p", [PacketId], State),
              emqttd_metrics:inc('packets/pubrel/missed'),
@@ -521,7 +528,7 @@ handle_cast(Msg, State) ->
 
 %% Dispatch Message
 handle_info({dispatch, Topic, Msg}, State) when is_record(Msg, mqtt_message) ->
-    {noreply, dispatch(tune_qos(Topic, Msg, State), State), hibernate};
+    {noreply, gc(dispatch(tune_qos(Topic, Msg, State), State)), hibernate};
 
 %% Do nothing if the client has been disconnected.
 handle_info({timeout, _Timer, retry_delivery}, State = #state{client_pid = undefined}) ->
@@ -808,3 +815,6 @@ hibernate(State) ->
 shutdown(Reason, State) ->
     {stop, {shutdown, Reason}, State}.
 
+gc(State) ->
+    emqttd_gc:maybe_force_gc(#state.force_gc_count, State).
+

+ 17 - 11
src/emqttd_ws_client.erl

@@ -49,7 +49,7 @@
 
 %% WebSocket Client State
 -record(wsclient_state, {ws_pid, peername, connection, proto_state, keepalive,
-                         enable_stats}).
+                         enable_stats, force_gc_count}).
 
 -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]).
 
@@ -59,7 +59,8 @@
 
 %% @doc Start WebSocket Client.
 start_link(Env, WsPid, Req, ReplyChannel) ->
-    gen_server2:start_link(?MODULE, [Env, WsPid, Req, ReplyChannel], []).
+    gen_server2:start_link(?MODULE, [Env, WsPid, Req, ReplyChannel],
+                           [{fullsweep_after, 10}]). %% Tune GC.
 
 info(CPid) ->
     gen_server2:call(CPid, info).
@@ -93,11 +94,13 @@ init([Env, WsPid, Req, ReplyChannel]) ->
                                       [{ws_initial_headers, Headers} | Env]),
     IdleTimeout = get_value(client_idle_timeout, Env, 30000),
     EnableStats = get_value(client_enable_stats, Env, false),
-    {ok, #wsclient_state{ws_pid       = WsPid,
-                         peername     = Peername,
-                         connection   = Req:get(connection),
-                         proto_state  = ProtoState,
-                         enable_stats = EnableStats},
+    ForceGcCount = emqttd_gc:conn_max_gc_count(),
+    {ok, #wsclient_state{ws_pid         = WsPid,
+                         peername       = Peername,
+                         connection     = Req:get(connection),
+                         proto_state    = ProtoState,
+                         enable_stats   = EnableStats,
+                         force_gc_count = ForceGcCount},
      IdleTimeout, {backoff, 1000, 1000, 10000}, ?MODULE}.
 
 prioritise_call(Msg, _From, _Len, _State) ->
@@ -108,7 +111,7 @@ prioritise_info(Msg, _Len, _State) ->
 
 handle_pre_hibernate(State = #wsclient_state{ws_pid = WsPid}) ->
     erlang:garbage_collect(WsPid),%%TODO: [{async, RequestId}]??
-    {hibernate, emit_stats(State)}.
+    {hibernate, emqttd_gc:reset_conn_gc_count(emit_stats(State))}.
 
 handle_call(info, From, State = #wsclient_state{peername    = Peername,
                                                 proto_state = ProtoState}) ->
@@ -135,7 +138,7 @@ handle_cast({received, Packet}, State = #wsclient_state{proto_state = ProtoState
     emqttd_metrics:received(Packet),
     case emqttd_protocol:received(Packet, ProtoState) of
         {ok, ProtoState1} ->
-            {noreply, State#wsclient_state{proto_state = ProtoState1}, hibernate};
+            {noreply, gc(State#wsclient_state{proto_state = ProtoState1}), hibernate};
         {error, Error} ->
             ?WSLOG(error, "Protocol error - ~p", [Error], State),
             shutdown(Error, State);
@@ -172,7 +175,7 @@ handle_info({deliver, Message}, State) ->
     with_proto(
       fun(ProtoState) ->
           emqttd_protocol:send(Message, ProtoState)
-      end, State);
+      end, gc(State));
 
 handle_info({redeliver, {?PUBREL, PacketId}}, State) ->
     with_proto(
@@ -277,6 +280,9 @@ reply(Reply, State) ->
 shutdown(Reason, State) ->
     stop({shutdown, Reason}, State).
 
-stop(Reason, State ) ->
+stop(Reason, State) ->
     {stop, Reason, State}.
 
+gc(State) ->
+    emqttd_gc:maybe_force_gc(#wsclient_state.force_gc_count, State).
+