Преглед на файлове

Rewrite emqx_gc.erl

The implementation prior to this commit supports
only one gc enforcement policy which is message count threshold.
The new implementation introduces 1 more: volume threshold based
spring2maz преди 7 години
родител
ревизия
721f237bc4
променени са 6 файла, в които са добавени 190 реда и са изтрити 45 реда
  1. 2 0
      .gitignore
  2. 14 0
      priv/emqx.schema
  3. 24 10
      src/emqx_connection.erl
  4. 73 24
      src/emqx_gc.erl
  5. 24 11
      src/emqx_session.erl
  6. 53 0
      test/emqx_gc_tests.erl

+ 2 - 0
.gitignore

@@ -35,3 +35,5 @@ bbmustache/
 etc/gen.emqx.conf
 compile_commands.json
 cuttlefish
+rebar.lock
+xrefr

+ 14 - 0
priv/emqx.schema

@@ -824,6 +824,14 @@ end}.
   {datatype, {enum, [true, false]}}
 ]}.
 
+%% @doc Force connection/session process GC after this number of
+%% messages | bytes passed through.
+%% Numbers delimited by `|'. Zero or negative is to disable.
+{mapping, "zone.$name.force_gc_policy", "emqx.zones", [
+   {default, "0|0"},
+   {datatype, string}
+ ]}.
+
 {translation, "emqx.zones", fun(Conf) ->
   Mapping = fun("retain_available", Val) ->
                     {mqtt_retain_available, Val};
@@ -831,6 +839,10 @@ end}.
                     {mqtt_wildcard_subscription, Val};
                ("shared_subscription", Val) ->
                     {mqtt_shared_subscription, Val};
+               ("force_gc_policy", Val) ->
+                    [Count, Bytes] = string:tokens(Val, "| "),
+                    {force_gc_policy, #{count => list_to_integer(Count),
+                                        bytes => list_to_integer(Bytes)}};
                (Opt, Val) ->
                     {list_to_atom(Opt), Val}
             end,
@@ -1750,3 +1762,5 @@ end}.
    {busy_port, cuttlefish:conf_get("sysmon.busy_port", Conf)},
    {busy_dist_port, cuttlefish:conf_get("sysmon.busy_dist_port", Conf)}]
 end}.
+
+

+ 24 - 10
src/emqx_connection.erl

@@ -148,7 +148,10 @@ init([Transport, RawSocket, Options]) ->
                                       proto_state   = ProtoState,
                                       parser_state  = ParserState,
                                       enable_stats  = EnableStats,
-                                      idle_timeout  = IdleTimout}),
+                                      idle_timeout  = IdleTimout
+                                     }),
+            GcPolicy = emqx_zone:get_env(Zone, force_gc_policy, false),
+            ok = emqx_gc:init(GcPolicy),
             gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}],
                                   State, self(), IdleTimout);
         {error, Reason} ->
@@ -200,14 +203,18 @@ handle_cast(Msg, State) ->
 handle_info({deliver, PubOrAck}, State = #state{proto_state = ProtoState}) ->
     case emqx_protocol:deliver(PubOrAck, ProtoState) of
         {ok, ProtoState1} ->
-            {noreply, maybe_gc(ensure_stats_timer(State#state{proto_state = ProtoState1}))};
+            State1 = ensure_stats_timer(State#state{proto_state = ProtoState1}),
+            ok = maybe_gc(State1, PubOrAck),
+            {noreply, State1};
         {error, Reason} ->
             shutdown(Reason, State)
     end;
-
 handle_info({timeout, Timer, emit_stats},
-            State = #state{stats_timer = Timer, proto_state = ProtoState}) ->
+            State = #state{stats_timer = Timer,
+                           proto_state = ProtoState
+                          }) ->
     emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), stats(State)),
+    ok = emqx_gc:reset(),
     {noreply, State#state{stats_timer = undefined}, hibernate};
 
 handle_info(timeout, State) ->
@@ -290,9 +297,10 @@ code_change(_OldVsn, State, _Extra) ->
 %%------------------------------------------------------------------------------
 
 %% Receive and parse data
-handle_packet(<<>>, State) ->
-    {noreply, maybe_gc(ensure_stats_timer(ensure_rate_limit(State)))};
-
+handle_packet(<<>>, State0) ->
+    State = ensure_stats_timer(ensure_rate_limit(State0)),
+    ok = maybe_gc(State, incoming),
+    {noreply, State};
 handle_packet(Data, State = #state{proto_state  = ProtoState,
                                    parser_state = ParserState,
                                    idle_timeout = IdleTimeout}) ->
@@ -376,7 +384,13 @@ shutdown(Reason, State) ->
 stop(Reason, State) ->
     {stop, Reason, State}.
 
-maybe_gc(State) ->
-    %% TODO: gc and shutdown policy
-    State.
+%% For incoming messages, bump gc-stats with packet count and totoal volume
+%% For outgoing messages, only 'publish' type is taken into account.
+maybe_gc(#state{incoming = #{bytes := Oct, packets := Cnt}}, incoming) ->
+    ok = emqx_gc:inc(Cnt, Oct);
+maybe_gc(#state{}, {publish, _PacketId, #message{payload = Payload}}) ->
+    Oct = iolist_size(Payload),
+    ok = emqx_gc:inc(1, Oct);
+maybe_gc(_, _) ->
+    ok.
 

+ 73 - 24
src/emqx_gc.erl

@@ -12,38 +12,87 @@
 %% See the License for the specific language governing permissions and
 %% limitations under the License.
 
-%% GC Utility functions.
+%% @doc This module manages an opaque collection of statistics data used to
+%% force garbage collection on `self()' process when hitting thresholds.
+%% Namely:
+%% (1) Total number of messages passed through
+%% (2) Total data volume passed through
+%% @end
 
 -module(emqx_gc).
 
-%% Memory: (10, 100, 1000)
-%%
+-author("Feng Lee <feng@emqtt.io>").
+
+-export([init/1, inc/2, reset/0]).
+
+-type st() :: #{ cnt => {integer(), integer()}
+               , oct => {integer(), integer()}
+               }.
+
+-define(disabled, disabled).
+-define(ENABLED(X), (is_integer(X) andalso X > 0)).
+
+%% @doc Initialize force GC parameters.
+-spec init(false | map()) -> ok.
+init(#{count := Count, bytes := Bytes}) ->
+    Cnt = [{cnt, {Count, Count}} || ?ENABLED(Count)],
+    Oct = [{oct, {Bytes, Bytes}} || ?ENABLED(Bytes)],
+    erlang:put(?MODULE, maps:from_list(Cnt ++ Oct)),
+    ok;
+init(_) -> erlang:put(?MODULE, #{}), ok.
 
--export([conn_max_gc_count/0, reset_conn_gc_count/2, maybe_force_gc/2,
-         maybe_force_gc/3]).
+%% @doc Increase count and bytes stats in one call,
+%% ensure gc is triggered at most once, even if both thresholds are hit.
+-spec inc(pos_integer(), pos_integer()) -> ok.
+inc(Cnt, Oct) ->
+    mutate_pd_with(fun(St) -> inc(St, Cnt, Oct) end).
 
--spec(conn_max_gc_count() -> integer()).
-conn_max_gc_count() ->
-    case emqx_config:get_env(conn_force_gc_count) of
-        I when is_integer(I), I > 0 -> I + rand:uniform(I);
-        I when is_integer(I), I =< 0 -> undefined;
-        undefined -> undefined
+%% @doc Reset counters to zero.
+-spec reset() -> ok.
+reset() ->
+    mutate_pd_with(fun(St) -> reset(St) end).
+
+%% ======== Internals ========
+
+%% mutate gc stats numbers in process dict with the given function
+mutate_pd_with(F) ->
+    St = F(erlang:get(?MODULE)),
+    erlang:put(?MODULE, St),
+    ok.
+
+%% Increase count and bytes stats in one call,
+%% ensure gc is triggered at most once, even if both thresholds are hit.
+-spec inc(st(), pos_integer(), pos_integer()) -> st().
+inc(St0, Cnt, Oct) ->
+    case do_inc(St0, cnt, Cnt) of
+        {true, St} ->
+            St;
+        {false, St1} ->
+            {_, St} = do_inc(St1, oct, Oct),
+            St
     end.
 
--spec(reset_conn_gc_count(pos_integer(), tuple()) -> tuple()).
-reset_conn_gc_count(Pos, State) ->
-    case element(Pos, State) of
-        undefined -> State;
-        _I        -> setelement(Pos, State, conn_max_gc_count())
+%% Reset counters to zero.
+reset(St) -> reset(cnt, reset(oct, St)).
+
+-spec do_inc(st(), cnt | oct, pos_integer()) -> {boolean(), st()}.
+do_inc(St, Key, Num) ->
+    case maps:get(Key, St, ?disabled) of
+        ?disabled ->
+            {false, St};
+        {Init, Remain} when Remain > Num ->
+            {false, maps:put(Key, {Init, Remain - Num}, St)};
+        _ ->
+            {true, do_gc(St)}
     end.
 
-maybe_force_gc(Pos, State) ->
-    maybe_force_gc(Pos, State, fun() -> ok end).
-maybe_force_gc(Pos, State, Cb) ->
-    case element(Pos, State) of
-        undefined     -> State;
-        I when I =< 0 -> Cb(), garbage_collect(),
-                         reset_conn_gc_count(Pos, State);
-        I             -> setelement(Pos, State, I - 1)
+do_gc(St) ->
+    erlang:garbage_collect(),
+    reset(St).
+
+reset(Key, St) ->
+    case maps:get(Key, St, ?disabled) of
+        ?disabled -> St;
+        {Init, _} -> maps:put(Key, {Init, Init}, St)
     end.
 

+ 24 - 11
src/emqx_session.erl

@@ -350,10 +350,13 @@ init([Parent, #{zone        := Zone,
                    enable_stats      = get_env(Zone, enable_stats, true),
                    deliver_stats     = 0,
                    enqueue_stats     = 0,
-                   created_at        = os:timestamp()},
+                   created_at        = os:timestamp()
+                  },
     emqx_sm:register_session(ClientId, attrs(State)),
     emqx_sm:set_session_stats(ClientId, stats(State)),
     emqx_hooks:run('session.created', [#{client_id => ClientId}, info(State)]),
+    GcPolicy = emqx_zone:get_env(Zone, force_gc_policy, false),
+    ok = emqx_gc:init(GcPolicy),
     ok = proc_lib:init_ack(Parent, {ok, self()}),
     gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}], State).
 
@@ -567,8 +570,11 @@ handle_info({timeout, Timer, retry_delivery}, State = #state{retry_timer = Timer
 handle_info({timeout, Timer, check_awaiting_rel}, State = #state{await_rel_timer = Timer}) ->
     noreply(expire_awaiting_rel(State#state{await_rel_timer = undefined}));
 
-handle_info({timeout, Timer, emit_stats}, State = #state{client_id = ClientId, stats_timer = Timer}) ->
+handle_info({timeout, Timer, emit_stats},
+            State = #state{client_id = ClientId,
+                           stats_timer = Timer}) ->
     _ = emqx_sm:set_session_stats(ClientId, stats(State)),
+    ok = emqx_gc:reset(), %% going to hibernate, reset gc stats
     {noreply, State#state{stats_timer = undefined}, hibernate};
 
 handle_info({timeout, Timer, expired}, State = #state{expiry_timer = Timer}) ->
@@ -744,21 +750,22 @@ dispatch(Msg, State = #state{client_id = ClientId, conn_pid = undefined}) ->
     end;
 
 %% Deliver qos0 message directly to client
-dispatch(Msg = #message{qos = ?QOS0}, State) ->
+dispatch(Msg = #message{qos = ?QOS0} = Msg, State) ->
     deliver(undefined, Msg, State),
-    inc_stats(deliver, State);
+    inc_stats(deliver, Msg, State);
 
-dispatch(Msg = #message{qos = QoS}, State = #state{next_pkt_id = PacketId, inflight = Inflight})
+dispatch(Msg = #message{qos = QoS} = Msg,
+         State = #state{next_pkt_id = PacketId, inflight = Inflight})
   when QoS =:= ?QOS1 orelse QoS =:= ?QOS2 ->
     case emqx_inflight:is_full(Inflight) of
         true -> enqueue_msg(Msg, State);
         false ->
             deliver(PacketId, Msg, State),
-            await(PacketId, Msg, inc_stats(deliver, next_pkt_id(State)))
+            await(PacketId, Msg, inc_stats(deliver, Msg, next_pkt_id(State)))
     end.
 
 enqueue_msg(Msg, State = #state{mqueue = Q}) ->
-    inc_stats(enqueue, State#state{mqueue = emqx_mqueue:in(Msg, Q)}).
+    inc_stats(enqueue, Msg, State#state{mqueue = emqx_mqueue:in(Msg, Q)}).
 
 %%------------------------------------------------------------------------------
 %% Deliver
@@ -882,11 +889,19 @@ next_pkt_id(State = #state{next_pkt_id = Id}) ->
 %%------------------------------------------------------------------------------
 %% Inc stats
 
-inc_stats(deliver, State = #state{deliver_stats = I}) ->
+inc_stats(deliver, Msg, State = #state{deliver_stats = I}) ->
+    MsgSize = msg_size(Msg),
+    ok = emqx_gc:inc(1, MsgSize),
     State#state{deliver_stats = I + 1};
-inc_stats(enqueue, State = #state{enqueue_stats = I}) ->
+inc_stats(enqueue, _Msg, State = #state{enqueue_stats = I}) ->
     State#state{enqueue_stats = I + 1}.
 
+%% Take only the payload size into account, add other fields if necessary
+msg_size(#message{payload = Payload}) -> payload_size(Payload).
+
+%% Payload should be binary(), but not 100% sure. Need dialyzer!
+payload_size(Payload) -> erlang:iolist_size(Payload).
+
 %%------------------------------------------------------------------------------
 %% Helper functions
 
@@ -902,5 +917,3 @@ noreply(State) ->
 shutdown(Reason, State) ->
     {stop, {shutdown, Reason}, State}.
 
-%% TODO: GC Policy and Shutdown Policy
-%% maybe_gc(State) -> State.

+ 53 - 0
test/emqx_gc_tests.erl

@@ -0,0 +1,53 @@
+%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+
+-module(emqx_gc_tests).
+
+-include_lib("eunit/include/eunit.hrl").
+
+trigger_by_cnt_test() ->
+    Args = #{count => 2, bytes => 0},
+    ok = emqx_gc:init(Args),
+    ok = emqx_gc:inc(1, 1000),
+    St1 = inspect(),
+    ?assertMatch({_, Remain} when Remain > 0, maps:get(cnt, St1)),
+    ok = emqx_gc:inc(2, 2),
+    St2 = inspect(),
+    ok = emqx_gc:inc(0, 2000),
+    St3 = inspect(),
+    ?assertEqual(St2, St3),
+    ?assertMatch({N, N}, maps:get(cnt, St2)),
+    ?assertNot(maps:is_key(oct, St2)),
+    ok.
+
+trigger_by_oct_test() ->
+    Args = #{count => 2, bytes => 2},
+    ok = emqx_gc:init(Args),
+    ok = emqx_gc:inc(1, 1),
+    St1 = inspect(),
+    ?assertMatch({_, Remain} when Remain > 0, maps:get(oct, St1)),
+    ok = emqx_gc:inc(2, 2),
+    St2 = inspect(),
+    ?assertMatch({N, N}, maps:get(oct, St2)),
+    ?assertMatch({M, M}, maps:get(cnt, St2)),
+    ok.
+
+disabled_test() ->
+    Args = #{count => -1, bytes => false},
+    ok = emqx_gc:init(Args),
+    ok = emqx_gc:inc(1, 1),
+    ?assertEqual(#{}, inspect()),
+    ok.
+
+inspect() -> erlang:get(emqx_gc).