Просмотр исходного кода

Add connection/session shutdown policy

The hibernation behaviour is also changed (implicitly) in this commit:
Prior to this change, connection/session always hibernates after
the stats timer expires regardless of messages in mailbox.
After this commit, connection/session process only goes to hibernate
when the timer expires AND there is nothing left in the mailbox to
process
spring2maz 7 лет назад
Родитель
Сommit
25b61afe0d
6 измененных файлов с 154 добавлено и 7 удалено
  1. 1 0
      priv/emqx.schema
  2. 11 3
      src/emqx_connection.erl
  3. 40 1
      src/emqx_misc.erl
  4. 11 3
      src/emqx_session.erl
  5. 54 0
      test/emqx_misc_tests.erl
  6. 37 0
      test/emqx_test_lib.erl

+ 1 - 0
priv/emqx.schema

@@ -1767,6 +1767,7 @@ end}.
 %% @doc Max message queue length for connection/session process.
 %% NOTE: Message queue here is the Erlang process mailbox, but not
 %% the number of MQTT queued messages.
+%% Set to 0 or negative to have it disabled
 {mapping, "mqtt.conn.max_msg_queue_len", "emqx.conn_max_msg_queue_len", [
   {default, 100000},
   {datatype, integer}

+ 11 - 3
src/emqx_connection.erl

@@ -214,9 +214,17 @@ handle_info({timeout, Timer, emit_stats},
                            proto_state = ProtoState
                           }) ->
     emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), stats(State)),
-    ok = emqx_gc:reset(),
-    {noreply, State#state{stats_timer = undefined}, hibernate};
-
+    NewState = State#state{stats_timer = undefined},
+    case meqx_misc:conn_proc_mng_policy() of
+        continue ->
+            {noreply, NewState};
+        hibernate ->
+            ok = emqx_gc:reset(),
+            {noreply, NewState, hibernate};
+        {shutdown, Reason} ->
+            ?LOG(warning, "shutdown due to ~p", [Reason], NewState),
+            shutdown(Reason, NewState)
+    end;
 handle_info(timeout, State) ->
     shutdown(idle_timeout, State);
 

+ 40 - 1
src/emqx_misc.erl

@@ -15,7 +15,7 @@
 -module(emqx_misc).
 
 -export([merge_opts/2, start_timer/2, start_timer/3, cancel_timer/1,
-         proc_name/2, proc_stats/0, proc_stats/1]).
+         proc_name/2, proc_stats/0, proc_stats/1, conn_proc_mng_policy/0]).
 
 %% @doc Merge options
 -spec(merge_opts(list(), list()) -> list()).
@@ -59,3 +59,42 @@ proc_stats(Pid) ->
     {value, {_, V}, Stats1} = lists:keytake(message_queue_len, 1, Stats),
     [{mailbox_len, V} | Stats1].
 
+-define(DISABLED, 0).
+
+%% @doc Check self() process status against connection/session process management policy,
+%% return `continue | hibernate | {shutdown, Reason}' accordingly.
+%% `continue': There is nothing out of the ordinary
+%% `hibernate': Nothing to process in my mailbox (and since this check is triggered
+%%              by a timer, we assume it is a fat chance to continue idel, hence hibernate.
+%% `shutdown': Some numbers (message queue length or heap size have hit the limit,
+%%             hence shutdown for greater good (system stability).
+-spec(conn_proc_mng_policy() -> continue | hibernate | {shutdown, _}).
+conn_proc_mng_policy() ->
+    MaxMsgQueueLen = application:get_env(?APPLICATION, conn_max_msg_queue_len, ?DISABLED),
+    Qlength = proc_info(message_queue_len),
+    Checks =
+        [{fun() -> is_enabled(MaxMsgQueueLen) andalso Qlength > MaxMsgQueueLen end,
+          {shutdown, message_queue_too_long}},
+         {fun() -> is_heap_size_too_large() end,
+          {shutdown, total_heap_size_too_large}},
+         {fun() -> Qlength > 0 end, continue},
+         {fun() -> true end, hibernate}
+        ],
+    check(Checks).
+
+check([{Pred, Result} | Rest]) ->
+    case Pred() of
+        true -> Result;
+        false -> check(Rest)
+    end.
+
+is_heap_size_too_large() ->
+    MaxTotalHeapSize = application:get_env(?APPLICATION, conn_max_total_heap_size, ?DISABLED),
+    is_enabled(MaxTotalHeapSize) andalso proc_info(total_heap_size) > MaxTotalHeapSize.
+
+is_enabled(Max) -> Max > ?DISABLED.
+
+proc_info(Key) ->
+    {Key, Value} = erlang:process_info(self(), Key),
+    Value.
+

+ 11 - 3
src/emqx_session.erl

@@ -574,9 +574,17 @@ handle_info({timeout, Timer, emit_stats},
             State = #state{client_id = ClientId,
                            stats_timer = Timer}) ->
     _ = emqx_sm:set_session_stats(ClientId, stats(State)),
-    ok = emqx_gc:reset(), %% going to hibernate, reset gc stats
-    {noreply, State#state{stats_timer = undefined}, hibernate};
-
+    NewState = State#state{stats_timer = undefined},
+    case emqx_misc:conn_proc_mng_policy() of
+        continue ->
+            {noreply, NewState};
+        hibernate ->
+            ok = emqx_gc:reset(), %% going to hibernate, reset gc stats
+            {noreply, NewState, hibernate};
+        {shutdown, Reason} ->
+            ?LOG(warning, "shutdown due to ~p", [Reason], NewState),
+            shutdown(Reason, NewState)
+    end;
 handle_info({timeout, Timer, expired}, State = #state{expiry_timer = Timer}) ->
     ?LOG(info, "expired, shutdown now:(", [], State),
     shutdown(expired, State);

+ 54 - 0
test/emqx_misc_tests.erl

@@ -0,0 +1,54 @@
+%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+
+-module(emqx_misc_tests).
+-include_lib("eunit/include/eunit.hrl").
+
+shutdown_disabled_test() ->
+    with_env(
+      [{conn_max_msg_queue_len, 0},
+       {conn_max_total_heap_size, 0}],
+      fun() ->
+          self() ! foo,
+          ?assertEqual(continue, conn_proc_mng_policy()),
+          receive foo -> ok end,
+          ?assertEqual(hibernate, conn_proc_mng_policy())
+      end).
+
+message_queue_too_long_test() ->
+    with_env(
+      [{conn_max_msg_queue_len, 1},
+       {conn_max_total_heap_size, 0}],
+      fun() ->
+          self() ! foo,
+          self() ! bar,
+          ?assertEqual({shutdown, message_queue_too_long},
+                       conn_proc_mng_policy()),
+          receive foo -> ok end,
+          ?assertEqual(continue, conn_proc_mng_policy()),
+          receive bar -> ok end
+      end).
+
+total_heap_size_too_large_test() ->
+    with_env(
+      [{conn_max_msg_queue_len, 0},
+       {conn_max_total_heap_size, 1}],
+      fun() ->
+          ?assertEqual({shutdown, total_heap_size_too_large},
+                       conn_proc_mng_policy())
+      end).
+
+with_env(Envs, F) -> emqx_test_lib:with_env(Envs, F).
+
+conn_proc_mng_policy() -> emqx_misc:conn_proc_mng_policy().

+ 37 - 0
test/emqx_test_lib.erl

@@ -0,0 +1,37 @@
+%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+
+-module(emqx_test_lib).
+
+-export([ with_env/2
+        ]).
+
+with_env([], F) -> F();
+with_env([{Key, Val} | Rest], F) ->
+    Origin = get_env(Key),
+    try
+        ok = set_env(Key, Val),
+        with_env(Rest, F)
+    after
+        case Origin of
+            undefined -> ok = unset_env(Key);
+            _ -> ok = set_env(Key, Origin)
+        end
+    end.
+
+get_env(Key) -> application:get_env(?APPLICATION, Key).
+set_env(Key, Val) -> application:set_env(?APPLICATION, Key, Val).
+unset_env(Key) -> application:unset_env(?APPLICATION, Key).
+
+