Browse Source

Implement a new 'emqx_gc' module (#2090)

Update connection/session module to using the new emqx_gc API
Feng Lee 7 years ago
parent
commit
367b717c40
3 changed files with 94 additions and 76 deletions
  1. 20 22
      src/emqx_connection.erl
  2. 55 46
      src/emqx_gc.erl
  3. 19 8
      src/emqx_session.erl

+ 20 - 22
src/emqx_connection.erl

@@ -40,10 +40,10 @@
           active_n,
           active_n,
           proto_state,
           proto_state,
           parser_state,
           parser_state,
+          gc_state,
           keepalive,
           keepalive,
           enable_stats,
           enable_stats,
           stats_timer,
           stats_timer,
-          incoming,
           rate_limit,
           rate_limit,
           pub_limit,
           pub_limit,
           limit_timer,
           limit_timer,
@@ -138,6 +138,8 @@ init([Transport, RawSocket, Options]) ->
                                               peercert => Peercert,
                                               peercert => Peercert,
                                               sendfun  => SendFun}, Options),
                                               sendfun  => SendFun}, Options),
             ParserState = emqx_protocol:parser(ProtoState),
             ParserState = emqx_protocol:parser(ProtoState),
+            GcPolicy = emqx_zone:get_env(Zone, force_gc_policy, false),
+            GcState = emqx_gc:init(GcPolicy),
             State = run_socket(#state{transport    = Transport,
             State = run_socket(#state{transport    = Transport,
                                       socket       = Socket,
                                       socket       = Socket,
                                       peername     = Peername,
                                       peername     = Peername,
@@ -147,11 +149,10 @@ init([Transport, RawSocket, Options]) ->
                                       pub_limit    = PubLimit,
                                       pub_limit    = PubLimit,
                                       proto_state  = ProtoState,
                                       proto_state  = ProtoState,
                                       parser_state = ParserState,
                                       parser_state = ParserState,
+                                      gc_state     = GcState,
                                       enable_stats = EnableStats,
                                       enable_stats = EnableStats,
                                       idle_timeout = IdleTimout
                                       idle_timeout = IdleTimout
                                      }),
                                      }),
-            GcPolicy = emqx_zone:get_env(Zone, force_gc_policy, false),
-            ok = emqx_gc:init(GcPolicy),
             ok = emqx_misc:init_proc_mng_policy(Zone),
             ok = emqx_misc:init_proc_mng_policy(Zone),
             emqx_logger:set_metadata_peername(esockd_net:format(Peername)),
             emqx_logger:set_metadata_peername(esockd_net:format(Peername)),
             gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}],
             gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}],
@@ -205,9 +206,8 @@ handle_cast(Msg, State) ->
 handle_info({deliver, PubOrAck}, State = #state{proto_state = ProtoState}) ->
 handle_info({deliver, PubOrAck}, State = #state{proto_state = ProtoState}) ->
     case emqx_protocol:deliver(PubOrAck, ProtoState) of
     case emqx_protocol:deliver(PubOrAck, ProtoState) of
         {ok, ProtoState1} ->
         {ok, ProtoState1} ->
-            State1 = ensure_stats_timer(State#state{proto_state = ProtoState1}),
-            ok = maybe_gc(State1, PubOrAck),
-            {noreply, State1};
+            State1 = State#state{proto_state = ProtoState1},
+            {noreply, maybe_gc(PubOrAck, ensure_stats_timer(State1))};
         {error, Reason} ->
         {error, Reason} ->
             shutdown(Reason, State)
             shutdown(Reason, State)
     end;
     end;
@@ -247,11 +247,10 @@ handle_info({shutdown, conflict, {ClientId, NewPid}}, State) ->
 
 
 handle_info({tcp, _Sock, Data}, State) ->
 handle_info({tcp, _Sock, Data}, State) ->
     ?LOG(debug, "RECV ~p", [Data]),
     ?LOG(debug, "RECV ~p", [Data]),
-    Size = iolist_size(Data),
-    emqx_metrics:trans(inc, 'bytes/received', Size),
-    emqx_pd:update_counter(incoming_bytes, Size),
-    Incoming = #{bytes => Size, packets => 0},
-    handle_packet(Data, State#state{incoming = Incoming});
+    Oct = iolist_size(Data),
+    emqx_pd:update_counter(incoming_bytes, Oct),
+    emqx_metrics:trans(inc, 'bytes/received', Oct),
+    handle_packet(Data, maybe_gc({1, Oct}, State));
 
 
 %% Rate limit here, cool:)
 %% Rate limit here, cool:)
 handle_info({tcp_passive, _Sock}, State) ->
 handle_info({tcp_passive, _Sock}, State) ->
@@ -325,9 +324,7 @@ code_change(_OldVsn, State, _Extra) ->
 
 
 %% Receive and parse data
 %% Receive and parse data
 handle_packet(<<>>, State) ->
 handle_packet(<<>>, State) ->
-    NState = ensure_stats_timer(State),
-    ok = maybe_gc(NState, incoming),
-    {noreply, NState};
+    {noreply, ensure_stats_timer(State)};
 
 
 handle_packet(Data, State = #state{proto_state  = ProtoState,
 handle_packet(Data, State = #state{proto_state  = ProtoState,
                                    parser_state = ParserState,
                                    parser_state = ParserState,
@@ -407,14 +404,15 @@ shutdown(Reason, State) ->
 stop(Reason, State) ->
 stop(Reason, State) ->
     {stop, Reason, State}.
     {stop, Reason, State}.
 
 
-%% For incoming messages, bump gc-stats with packet count and totoal volume
-%% For outgoing messages, only 'publish' type is taken into account.
-maybe_gc(#state{incoming = #{bytes := Oct, packets := Cnt}}, incoming) ->
-    ok = emqx_gc:inc(Cnt, Oct);
-maybe_gc(#state{}, {publish, _PacketId, #message{payload = Payload}}) ->
+maybe_gc(_, State = #state{gc_state = undefined}) ->
+    State;
+maybe_gc({publish, _PacketId, #message{payload = Payload}}, State) ->
     Oct = iolist_size(Payload),
     Oct = iolist_size(Payload),
-    ok = emqx_gc:inc(1, Oct);
-maybe_gc(_, _) ->
-    ok.
+    maybe_gc({1, Oct}, State);
+maybe_gc({Cnt, Oct}, State = #state{gc_state = GCSt}) ->
+    {_, GCSt1} = emqx_gc:run(Cnt, Oct, GCSt),
+    State#state{gc_state = GCSt1};
+maybe_gc(_, State) ->
+    State.
 
 
 
 

+ 55 - 46
src/emqx_gc.erl

@@ -21,74 +21,83 @@
 
 
 -module(emqx_gc).
 -module(emqx_gc).
 
 
--export([init/1, inc/2, reset/0]).
+-export([init/1, run/3, info/1, reset/1]).
 
 
--type st() :: #{ cnt => {integer(), integer()}
-               , oct => {integer(), integer()}
-               }.
+-type(opts() :: #{count => integer(),
+                  bytes => integer()}).
+
+-type(st() :: #{cnt => {integer(), integer()},
+                oct => {integer(), integer()}}).
+
+-type(gc_state() :: {?MODULE, st()}).
 
 
 -define(disabled, disabled).
 -define(disabled, disabled).
 -define(ENABLED(X), (is_integer(X) andalso X > 0)).
 -define(ENABLED(X), (is_integer(X) andalso X > 0)).
 
 
-%% @doc Initialize force GC parameters.
--spec init(false | map()) -> ok.
+%% @doc Initialize force GC state.
+-spec(init(opts() | false) -> gc_state() | undefined).
 init(#{count := Count, bytes := Bytes}) ->
 init(#{count := Count, bytes := Bytes}) ->
     Cnt = [{cnt, {Count, Count}} || ?ENABLED(Count)],
     Cnt = [{cnt, {Count, Count}} || ?ENABLED(Count)],
     Oct = [{oct, {Bytes, Bytes}} || ?ENABLED(Bytes)],
     Oct = [{oct, {Bytes, Bytes}} || ?ENABLED(Bytes)],
-    erlang:put(?MODULE, maps:from_list(Cnt ++ Oct)),
-    ok;
-init(_) -> erlang:put(?MODULE, #{}), ok.
-
-%% @doc Increase count and bytes stats in one call,
-%% ensure gc is triggered at most once, even if both thresholds are hit.
--spec inc(pos_integer(), pos_integer()) -> ok.
-inc(Cnt, Oct) ->
-    mutate_pd_with(fun(St) -> inc(St, Cnt, Oct) end).
-
-%% @doc Reset counters to zero.
--spec reset() -> ok.
-reset() ->
-    mutate_pd_with(fun(St) -> reset(St) end).
-
-%% ======== Internals ========
-
-%% mutate gc stats numbers in process dict with the given function
-mutate_pd_with(F) ->
-    St = F(erlang:get(?MODULE)),
-    erlang:put(?MODULE, St),
-    ok.
-
-%% Increase count and bytes stats in one call,
-%% ensure gc is triggered at most once, even if both thresholds are hit.
--spec inc(st(), pos_integer(), pos_integer()) -> st().
-inc(St0, Cnt, Oct) ->
-    case do_inc(St0, cnt, Cnt) of
-        {true, St} ->
-            St;
+    {?MODULE, maps:from_list(Cnt ++ Oct)};
+init(false) -> undefined.
+
+%% @doc Try to run GC based on reduntions of count or bytes.
+-spec(run(pos_integer(), pos_integer(), gc_state()) -> {boolean(), gc_state()}).
+run(Cnt, Oct, {?MODULE, St}) ->
+    {Res, St1} = run([{cnt, Cnt}, {oct, Oct}], St),
+    {Res, {?MODULE, St1}};
+run(_Cnt, _Oct, undefined) ->
+    {false, undefined}.
+
+run([], St) ->
+    {false, St};
+run([{K, N}|T], St) ->
+    case dec(K, N, St) of
+        {true, St1} ->
+            {true, do_gc(St1)};
         {false, St1} ->
         {false, St1} ->
-            {_, St} = do_inc(St1, oct, Oct),
-            St
+            run(T, St1)
     end.
     end.
 
 
-%% Reset counters to zero.
-reset(St) -> reset(cnt, reset(oct, St)).
+%% @doc Info of GC state.
+-spec(info(gc_state()) -> map() | undefined).
+info({?MODULE, St}) ->
+    St;
+info(undefined) ->
+    undefined.
 
 
--spec do_inc(st(), cnt | oct, pos_integer()) -> {boolean(), st()}.
-do_inc(St, Key, Num) ->
+%% @doc Reset counters to zero.
+-spec(reset(gc_state()) -> gc_state()).
+reset({?MODULE, St}) ->
+    {?MODULE, do_reset(St)};
+reset(undefined) ->
+    undefined.
+
+%%------------------------------------------------------------------------------
+%% Internal functions
+%%------------------------------------------------------------------------------
+
+-spec(dec(cnt | oct, pos_integer(), st()) -> {boolean(), st()}).
+dec(Key, Num, St) ->
     case maps:get(Key, St, ?disabled) of
     case maps:get(Key, St, ?disabled) of
         ?disabled ->
         ?disabled ->
             {false, St};
             {false, St};
         {Init, Remain} when Remain > Num ->
         {Init, Remain} when Remain > Num ->
             {false, maps:put(Key, {Init, Remain - Num}, St)};
             {false, maps:put(Key, {Init, Remain - Num}, St)};
         _ ->
         _ ->
-            {true, do_gc(St)}
+            {true, St}
     end.
     end.
 
 
 do_gc(St) ->
 do_gc(St) ->
-    erlang:garbage_collect(),
-    reset(St).
+    true = erlang:garbage_collect(),
+    do_reset(St).
 
 
-reset(Key, St) ->
+do_reset(St) ->
+    do_reset(cnt, do_reset(oct, St)).
+
+%% Reset counters to zero.
+do_reset(Key, St) ->
     case maps:get(Key, St, ?disabled) of
     case maps:get(Key, St, ?disabled) of
         ?disabled -> St;
         ?disabled -> St;
         {Init, _} -> maps:put(Key, {Init, Init}, St)
         {Init, _} -> maps:put(Key, {Init, Init}, St)

+ 19 - 8
src/emqx_session.erl

@@ -144,6 +144,9 @@
           %% Enqueue stats
           %% Enqueue stats
           enqueue_stats = 0,
           enqueue_stats = 0,
 
 
+          %% GC State
+          gc_state,
+
           %% Created at
           %% Created at
           created_at :: erlang:timestamp(),
           created_at :: erlang:timestamp(),
 
 
@@ -344,6 +347,7 @@ init([Parent, #{zone            := Zone,
     process_flag(trap_exit, true),
     process_flag(trap_exit, true),
     true = link(ConnPid),
     true = link(ConnPid),
     emqx_logger:set_metadata_client_id(ClientId),
     emqx_logger:set_metadata_client_id(ClientId),
+    GcPolicy = emqx_zone:get_env(Zone, force_gc_policy, false),
     IdleTimout = get_env(Zone, idle_timeout, 30000),
     IdleTimout = get_env(Zone, idle_timeout, 30000),
     State = #state{idle_timeout      = IdleTimout,
     State = #state{idle_timeout      = IdleTimout,
                    clean_start       = CleanStart,
                    clean_start       = CleanStart,
@@ -364,6 +368,7 @@ init([Parent, #{zone            := Zone,
                    enable_stats      = get_env(Zone, enable_stats, true),
                    enable_stats      = get_env(Zone, enable_stats, true),
                    deliver_stats     = 0,
                    deliver_stats     = 0,
                    enqueue_stats     = 0,
                    enqueue_stats     = 0,
+                   gc_state          = emqx_gc:init(GcPolicy),
                    created_at        = os:timestamp(),
                    created_at        = os:timestamp(),
                    will_msg          = WillMsg
                    will_msg          = WillMsg
                   },
                   },
@@ -371,8 +376,6 @@ init([Parent, #{zone            := Zone,
     true = emqx_sm:set_session_attrs(ClientId, attrs(State)),
     true = emqx_sm:set_session_attrs(ClientId, attrs(State)),
     true = emqx_sm:set_session_stats(ClientId, stats(State)),
     true = emqx_sm:set_session_stats(ClientId, stats(State)),
     emqx_hooks:run('session.created', [#{client_id => ClientId}, info(State)]),
     emqx_hooks:run('session.created', [#{client_id => ClientId}, info(State)]),
-    GcPolicy = emqx_zone:get_env(Zone, force_gc_policy, false),
-    ok = emqx_gc:init(GcPolicy),
     ok = emqx_misc:init_proc_mng_policy(Zone),
     ok = emqx_misc:init_proc_mng_policy(Zone),
     ok = proc_lib:init_ack(Parent, {ok, self()}),
     ok = proc_lib:init_ack(Parent, {ok, self()}),
     gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}], State).
     gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}], State).
@@ -605,7 +608,9 @@ handle_info({timeout, Timer, check_awaiting_rel}, State = #state{await_rel_timer
     noreply(ensure_stats_timer(expire_awaiting_rel(State1)));
     noreply(ensure_stats_timer(expire_awaiting_rel(State1)));
 
 
 handle_info({timeout, Timer, emit_stats},
 handle_info({timeout, Timer, emit_stats},
-            State = #state{client_id = ClientId, stats_timer = Timer}) ->
+            State = #state{client_id = ClientId,
+                           stats_timer = Timer,
+                           gc_state = GcState}) ->
     emqx_metrics:commit(),
     emqx_metrics:commit(),
     _ = emqx_sm:set_session_stats(ClientId, stats(State)),
     _ = emqx_sm:set_session_stats(ClientId, stats(State)),
     NewState = State#state{stats_timer = undefined},
     NewState = State#state{stats_timer = undefined},
@@ -614,8 +619,9 @@ handle_info({timeout, Timer, emit_stats},
         continue ->
         continue ->
             {noreply, NewState};
             {noreply, NewState};
         hibernate ->
         hibernate ->
-            ok = emqx_gc:reset(), %% going to hibernate, reset gc stats
-            {noreply, NewState, hibernate};
+            %% going to hibernate, reset gc stats
+            GcState1 = emqx_gc:reset(GcState),
+            {noreply, NewState#state{gc_state = GcState1}, hibernate};
         {shutdown, Reason} ->
         {shutdown, Reason} ->
             ?LOG(warning, "shutdown due to ~p", [Reason], NewState),
             ?LOG(warning, "shutdown due to ~p", [Reason], NewState),
             shutdown(Reason, NewState)
             shutdown(Reason, NewState)
@@ -991,9 +997,8 @@ next_pkt_id(State = #state{next_pkt_id = Id}) ->
 %% Inc stats
 %% Inc stats
 
 
 inc_stats(deliver, Msg, State = #state{deliver_stats = I}) ->
 inc_stats(deliver, Msg, State = #state{deliver_stats = I}) ->
-    MsgSize = msg_size(Msg),
-    ok = emqx_gc:inc(1, MsgSize),
-    State#state{deliver_stats = I + 1};
+    State1 = maybe_gc({1, msg_size(Msg)}, State),
+    State1#state{deliver_stats = I + 1};
 inc_stats(enqueue, _Msg, State = #state{enqueue_stats = I}) ->
 inc_stats(enqueue, _Msg, State = #state{enqueue_stats = I}) ->
     State#state{enqueue_stats = I + 1}.
     State#state{enqueue_stats = I + 1}.
 
 
@@ -1018,3 +1023,9 @@ noreply(State) ->
 shutdown(Reason, State) ->
 shutdown(Reason, State) ->
     {stop, Reason, State}.
     {stop, Reason, State}.
 
 
+maybe_gc(_, State = #state{gc_state = undefined}) ->
+    State;
+maybe_gc({Cnt, Oct}, State = #state{gc_state = GCSt}) ->
+    {_, GCSt1} = emqx_gc:run(Cnt, Oct, GCSt),
+    State#state{gc_state = GCSt1}.
+