Browse Source

Merge branch 'emqx30' into more-gc-enforcement-policies

Feng Lee 7 years ago
parent
commit
02ddcf37cd
6 changed files with 146 additions and 15 deletions
  1. 21 2
      priv/emqx.schema
  2. 13 3
      src/emqx_connection.erl
  3. 52 6
      src/emqx_misc.erl
  4. 13 3
      src/emqx_session.erl
  5. 46 0
      test/emqx_misc_tests.erl
  6. 1 1
      test/emqx_session_SUITE.erl

+ 21 - 2
priv/emqx.schema

@@ -832,6 +832,17 @@ end}.
    {datatype, string}
  ]}.
 
+%% @doc Max message queue length and total heap size to force shutdown
+%% connection/session process.
+%% Message queue here is the Erlang process mailbox, but not the number
+%% of queued MQTT messages of QoS 1 and 2.
+%% Total heap size is the in Erlang 'words' not in 'bytes'.
+%% Zero or negative is to disable.
+{mapping, "zone.$name.force_shutdown_policy", "emqx.zones", [
+  {default, "0 | 0MB"},
+  {datatype, string}
+]}.
+
 {translation, "emqx.zones", fun(Conf) ->
   Mapping = fun("retain_available", Val) ->
                     {mqtt_retain_available, Val};
@@ -848,7 +859,16 @@ end}.
                                        #{bytes => Bytes1, count => list_to_integer(Count)}
                                end,
                     {force_gc_policy, GcPolicy};
-
+               ("force_shutdown_policy", Val) ->
+                    [Len, Siz] = string:tokens(Val, "| "),
+                    ShutdownPolicy = case cuttlefish_bytesize:parse(Siz) of
+                                   {error, Reason} ->
+                                       error(Reason);
+                                   Siz1 ->
+                                         #{message_queue_len => list_to_integer(Len),
+                                           total_heap_size   => Siz1}
+                               end,
+                    {force_shutdown_policy, ShutdownPolicy};
                (Opt, Val) ->
                     {list_to_atom(Opt), Val}
             end,
@@ -1769,4 +1789,3 @@ end}.
    {busy_dist_port, cuttlefish:conf_get("sysmon.busy_dist_port", Conf)}]
 end}.
 
-

+ 13 - 3
src/emqx_connection.erl

@@ -152,6 +152,7 @@ init([Transport, RawSocket, Options]) ->
                                      }),
             GcPolicy = emqx_zone:get_env(Zone, force_gc_policy, false),
             ok = emqx_gc:init(GcPolicy),
+            erlang:put(force_shutdown_policy, emqx_zone:get_env(Zone, force_shutdown_policy)),
             gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}],
                                   State, self(), IdleTimout);
         {error, Reason} ->
@@ -214,9 +215,18 @@ handle_info({timeout, Timer, emit_stats},
                            proto_state = ProtoState
                           }) ->
     emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), stats(State)),
-    ok = emqx_gc:reset(),
-    {noreply, State#state{stats_timer = undefined}, hibernate};
-
+    NewState = State#state{stats_timer = undefined},
+    Limits = erlang:get(force_shutdown_policy),
+    case emqx_misc:conn_proc_mng_policy(Limits) of
+        continue ->
+            {noreply, NewState};
+        hibernate ->
+            ok = emqx_gc:reset(),
+            {noreply, NewState, hibernate};
+        {shutdown, Reason} ->
+            ?LOG(warning, "shutdown due to ~p", [Reason], NewState),
+            shutdown(Reason, NewState)
+    end;
 handle_info(timeout, State) ->
     shutdown(idle_timeout, State);
 

+ 52 - 6
src/emqx_misc.erl

@@ -15,7 +15,7 @@
 -module(emqx_misc).
 
 -export([merge_opts/2, start_timer/2, start_timer/3, cancel_timer/1,
-         proc_name/2, proc_stats/0, proc_stats/1]).
+         proc_name/2, proc_stats/0, proc_stats/1, conn_proc_mng_policy/1]).
 
 %% @doc Merge options
 -spec(merge_opts(list(), list()) -> list()).
@@ -36,14 +36,13 @@ start_timer(Interval, Dest, Msg) ->
     erlang:start_timer(Interval, Dest, Msg).
 
 -spec(cancel_timer(undefined | reference()) -> ok).
-cancel_timer(undefined) ->
-    ok;
-cancel_timer(Timer) ->
-    case catch erlang:cancel_timer(Timer) of
+cancel_timer(Timer) when is_reference(Timer) ->
+    case erlang:cancel_timer(Timer) of
         false ->
             receive {timeout, Timer, _} -> ok after 0 -> ok end;
         _ -> ok
-    end.
+    end;
+cancel_timer(_) -> ok.
 
 -spec(proc_name(atom(), pos_integer()) -> atom()).
 proc_name(Mod, Id) ->
@@ -59,3 +58,50 @@ proc_stats(Pid) ->
     {value, {_, V}, Stats1} = lists:keytake(message_queue_len, 1, Stats),
     [{mailbox_len, V} | Stats1].
 
+-define(DISABLED, 0).
+
+%% @doc Check self() process status against connection/session process management policy,
+%% return `continue | hibernate | {shutdown, Reason}' accordingly.
+%% `continue': There is nothing out of the ordinary.
+%% `hibernate': Nothing to process in my mailbox, and since this check is triggered
+%%              by a timer, we assume it is a fat chance to continue idel, hence hibernate.
+%% `shutdown': Some numbers (message queue length or heap size have hit the limit),
+%%             hence shutdown for greater good (system stability).
+-spec(conn_proc_mng_policy(#{message_queue_len := integer(),
+                             total_heap_size := integer()
+                            } | undefined) -> continue | hibernate | {shutdown, _}).
+conn_proc_mng_policy(#{message_queue_len := MaxMsgQueueLen,
+                       total_heap_size := MaxTotalHeapSize
+                      }) ->
+    Qlength = proc_info(message_queue_len),
+    Checks =
+        [{fun() -> is_message_queue_too_long(Qlength, MaxMsgQueueLen) end,
+          {shutdown, message_queue_too_long}},
+         {fun() -> is_heap_size_too_large(MaxTotalHeapSize) end,
+          {shutdown, total_heap_size_too_large}},
+         {fun() -> Qlength > 0 end, continue},
+         {fun() -> true end, hibernate}
+        ],
+    check(Checks);
+conn_proc_mng_policy(_) ->
+    %% disable by default
+    conn_proc_mng_policy(#{message_queue_len => 0, total_heap_size => 0}).
+
+check([{Pred, Result} | Rest]) ->
+    case Pred() of
+        true -> Result;
+        false -> check(Rest)
+    end.
+
+is_message_queue_too_long(Qlength, Max) ->
+    is_enabled(Max) andalso Qlength > Max.
+
+is_heap_size_too_large(Max) ->
+    is_enabled(Max) andalso proc_info(total_heap_size) > Max.
+
+is_enabled(Max) -> is_integer(Max) andalso Max > ?DISABLED.
+
+proc_info(Key) ->
+    {Key, Value} = erlang:process_info(self(), Key),
+    Value.
+

+ 13 - 3
src/emqx_session.erl

@@ -357,6 +357,7 @@ init([Parent, #{zone        := Zone,
     emqx_hooks:run('session.created', [#{client_id => ClientId}, info(State)]),
     GcPolicy = emqx_zone:get_env(Zone, force_gc_policy, false),
     ok = emqx_gc:init(GcPolicy),
+    erlang:put(force_shutdown_policy, emqx_zone:get_env(Zone, force_shutdown_policy)),
     ok = proc_lib:init_ack(Parent, {ok, self()}),
     gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}], State).
 
@@ -574,9 +575,18 @@ handle_info({timeout, Timer, emit_stats},
             State = #state{client_id = ClientId,
                            stats_timer = Timer}) ->
     _ = emqx_sm:set_session_stats(ClientId, stats(State)),
-    ok = emqx_gc:reset(), %% going to hibernate, reset gc stats
-    {noreply, State#state{stats_timer = undefined}, hibernate};
-
+    NewState = State#state{stats_timer = undefined},
+    Limits = erlang:get(force_shutdown_policy),
+    case emqx_misc:conn_proc_mng_policy(Limits) of
+        continue ->
+            {noreply, NewState};
+        hibernate ->
+            ok = emqx_gc:reset(), %% going to hibernate, reset gc stats
+            {noreply, NewState, hibernate};
+        {shutdown, Reason} ->
+            ?LOG(warning, "shutdown due to ~p", [Reason], NewState),
+            shutdown(Reason, NewState)
+    end;
 handle_info({timeout, Timer, expired}, State = #state{expiry_timer = Timer}) ->
     ?LOG(info, "expired, shutdown now:(", [], State),
     shutdown(expired, State);

+ 46 - 0
test/emqx_misc_tests.erl

@@ -0,0 +1,46 @@
+%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+
+-module(emqx_misc_tests).
+-include_lib("eunit/include/eunit.hrl").
+
+timer_cancel_flush_test() ->
+    Timer = emqx_misc:start_timer(0, foo),
+    ok = emqx_misc:cancel_timer(Timer),
+    receive {timeout, Timer, foo} -> error(unexpected)
+    after 0 -> ok
+    end.
+
+shutdown_disabled_test() ->
+    self() ! foo,
+    ?assertEqual(continue, conn_proc_mng_policy(0, 0)),
+    receive foo -> ok end,
+    ?assertEqual(hibernate, conn_proc_mng_policy(0, 0)).
+
+message_queue_too_long_test() ->
+    self() ! foo,
+    self() ! bar,
+    ?assertEqual({shutdown, message_queue_too_long},
+                 conn_proc_mng_policy(1, 0)),
+    receive foo -> ok end,
+    ?assertEqual(continue, conn_proc_mng_policy(1, 0)),
+    receive bar -> ok end.
+
+total_heap_size_too_large_test() ->
+    ?assertEqual({shutdown, total_heap_size_too_large},
+                 conn_proc_mng_policy(0, 1)).
+
+conn_proc_mng_policy(L, S) ->
+    emqx_misc:conn_proc_mng_policy(#{message_queue_len => L,
+                                     total_heap_size => S}).

+ 1 - 1
test/emqx_session_SUITE.erl

@@ -25,7 +25,7 @@ all() -> [t_session_all].
 init_per_suite(Config) ->
     emqx_ct_broker_helpers:run_setup_steps(),
     Config.
-    
+
 end_per_suite(_Config) ->
     emqx_ct_broker_helpers:run_teardown_steps().