Procházet zdrojové kódy

Move shutdown policy config to zone configs

spring2maz před 7 roky
rodič
revize
f58165db73

+ 16 - 18
priv/emqx.schema

@@ -828,10 +828,21 @@ end}.
 %% messages | bytes passed through.
 %% Numbers delimited by `|'. Zero or negative is to disable.
 {mapping, "zone.$name.force_gc_policy", "emqx.zones", [
-   {default, "0|0"},
+   {default, "0 | 0"},
    {datatype, string}
  ]}.
 
+%% @doc Max message queue length and total heap size to force shutdown
+%% connection/session process.
+%% Message queue here is the Erlang process mailbox, but not the number
+%% of queued MQTT messages of QoS 1 and 2.
+%% Total heap size is the in Erlang 'words' not in 'bytes'.
+%% Zero or negative is to disable.
+{mapping, "zone.$name.force_shutdown_policy", "emqx.zones", [
+  {default, "0 | 0"},
+  {datatype, string}
+]}.
+
 {translation, "emqx.zones", fun(Conf) ->
   Mapping = fun("retain_available", Val) ->
                     {mqtt_retain_available, Val};
@@ -843,6 +854,10 @@ end}.
                     [Count, Bytes] = string:tokens(Val, "| "),
                     {force_gc_policy, #{count => list_to_integer(Count),
                                         bytes => list_to_integer(Bytes)}};
+               ("force_shutdown_policy", Val) ->
+                    [Len, Siz] = string:tokens(Val, "| "),
+                    {force_shutdown_policy, #{message_queue_len => list_to_integer(Len),
+                                              total_heap_size => list_to_integer(Siz)}};
                (Opt, Val) ->
                     {list_to_atom(Opt), Val}
             end,
@@ -1763,20 +1778,3 @@ end}.
    {busy_dist_port, cuttlefish:conf_get("sysmon.busy_dist_port", Conf)}]
 end}.
 
-
-%% @doc Max message queue length for connection/session process.
-%% NOTE: Message queue here is the Erlang process mailbox, but not
-%% the number of MQTT queued messages.
-%% Set to 0 or negative to have it disabled
-{mapping, "mqtt.conn.max_msg_queue_len", "emqx.conn_max_msg_queue_len", [
-  {default, 100000},
-  {datatype, integer}
-]}.
-
-%% @doc Max total heap size to kill connection/session process.
-%% set to 0 or negative number to have it disabled.
-{mapping, "mqtt.conn.max_total_heap_size", "emqx.conn_max_total_heap_size", [
-  {default, -1}, %% disabled by default
-  {datatype, integer}
-]}.
-

+ 3 - 1
src/emqx_connection.erl

@@ -152,6 +152,7 @@ init([Transport, RawSocket, Options]) ->
                                      }),
             GcPolicy = emqx_zone:get_env(Zone, force_gc_policy, false),
             ok = emqx_gc:init(GcPolicy),
+            erlang:put(force_shutdown_policy, emqx_zone:get_env(Zone, force_shutdown_policy)),
             gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}],
                                   State, self(), IdleTimout);
         {error, Reason} ->
@@ -215,7 +216,8 @@ handle_info({timeout, Timer, emit_stats},
                           }) ->
     emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), stats(State)),
     NewState = State#state{stats_timer = undefined},
-    case meqx_misc:conn_proc_mng_policy() of
+    Limits = erlang:get(force_shutdown_policy),
+    case meqx_misc:conn_proc_mng_policy(Limits) of
         continue ->
             {noreply, NewState};
         hibernate ->

+ 22 - 14
src/emqx_misc.erl

@@ -15,7 +15,7 @@
 -module(emqx_misc).
 
 -export([merge_opts/2, start_timer/2, start_timer/3, cancel_timer/1,
-         proc_name/2, proc_stats/0, proc_stats/1, conn_proc_mng_policy/0]).
+         proc_name/2, proc_stats/0, proc_stats/1, conn_proc_mng_policy/1]).
 
 %% @doc Merge options
 -spec(merge_opts(list(), list()) -> list()).
@@ -62,24 +62,30 @@ proc_stats(Pid) ->
 
 %% @doc Check self() process status against connection/session process management policy,
 %% return `continue | hibernate | {shutdown, Reason}' accordingly.
-%% `continue': There is nothing out of the ordinary
-%% `hibernate': Nothing to process in my mailbox (and since this check is triggered
+%% `continue': There is nothing out of the ordinary.
+%% `hibernate': Nothing to process in my mailbox, and since this check is triggered
 %%              by a timer, we assume it is a fat chance to continue idel, hence hibernate.
-%% `shutdown': Some numbers (message queue length or heap size have hit the limit,
+%% `shutdown': Some numbers (message queue length or heap size have hit the limit),
 %%             hence shutdown for greater good (system stability).
--spec(conn_proc_mng_policy() -> continue | hibernate | {shutdown, _}).
-conn_proc_mng_policy() ->
-    MaxMsgQueueLen = application:get_env(?APPLICATION, conn_max_msg_queue_len, ?DISABLED),
+-spec(conn_proc_mng_policy(#{message_queue_len := integer(),
+                             total_heap_size := integer()
+                            } | undefined) -> continue | hibernate | {shutdown, _}).
+conn_proc_mng_policy(#{message_queue_len := MaxMsgQueueLen,
+                       total_heap_size := MaxTotalHeapSize
+                      }) ->
     Qlength = proc_info(message_queue_len),
     Checks =
-        [{fun() -> is_enabled(MaxMsgQueueLen) andalso Qlength > MaxMsgQueueLen end,
+        [{fun() -> is_message_queue_too_long(Qlength, MaxMsgQueueLen) end,
           {shutdown, message_queue_too_long}},
-         {fun() -> is_heap_size_too_large() end,
+         {fun() -> is_heap_size_too_large(MaxTotalHeapSize) end,
           {shutdown, total_heap_size_too_large}},
          {fun() -> Qlength > 0 end, continue},
          {fun() -> true end, hibernate}
         ],
-    check(Checks).
+    check(Checks);
+conn_proc_mng_policy(_) ->
+    %% disable by default
+    conn_proc_mng_policy(#{message_queue_len => 0, total_heap_size => 0}).
 
 check([{Pred, Result} | Rest]) ->
     case Pred() of
@@ -87,11 +93,13 @@ check([{Pred, Result} | Rest]) ->
         false -> check(Rest)
     end.
 
-is_heap_size_too_large() ->
-    MaxTotalHeapSize = application:get_env(?APPLICATION, conn_max_total_heap_size, ?DISABLED),
-    is_enabled(MaxTotalHeapSize) andalso proc_info(total_heap_size) > MaxTotalHeapSize.
+is_message_queue_too_long(Qlength, Max) ->
+    is_enabled(Max) andalso Qlength > Max.
 
-is_enabled(Max) -> Max > ?DISABLED.
+is_heap_size_too_large(Max) ->
+    is_enabled(Max) andalso proc_info(total_heap_size) > Max.
+
+is_enabled(Max) -> is_integer(Max) andalso Max > ?DISABLED.
 
 proc_info(Key) ->
     {Key, Value} = erlang:process_info(self(), Key),

+ 3 - 1
src/emqx_session.erl

@@ -357,6 +357,7 @@ init([Parent, #{zone        := Zone,
     emqx_hooks:run('session.created', [#{client_id => ClientId}, info(State)]),
     GcPolicy = emqx_zone:get_env(Zone, force_gc_policy, false),
     ok = emqx_gc:init(GcPolicy),
+    erlang:put(force_shutdown_policy, emqx_zone:get_env(Zone, force_shutdown_policy)),
     ok = proc_lib:init_ack(Parent, {ok, self()}),
     gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}], State).
 
@@ -575,7 +576,8 @@ handle_info({timeout, Timer, emit_stats},
                            stats_timer = Timer}) ->
     _ = emqx_sm:set_session_stats(ClientId, stats(State)),
     NewState = State#state{stats_timer = undefined},
-    case emqx_misc:conn_proc_mng_policy() of
+    Limits = erlang:get(force_shutdown_policy),
+    case emqx_misc:conn_proc_mng_policy(Limits) of
         continue ->
             {noreply, NewState};
         hibernate ->

+ 16 - 31
test/emqx_misc_tests.erl

@@ -23,39 +23,24 @@ timer_cancel_flush_test() ->
     end.
 
 shutdown_disabled_test() ->
-    with_env(
-      [{conn_max_msg_queue_len, 0},
-       {conn_max_total_heap_size, 0}],
-      fun() ->
-          self() ! foo,
-          ?assertEqual(continue, conn_proc_mng_policy()),
-          receive foo -> ok end,
-          ?assertEqual(hibernate, conn_proc_mng_policy())
-      end).
+    self() ! foo,
+    ?assertEqual(continue, conn_proc_mng_policy(0, 0)),
+    receive foo -> ok end,
+    ?assertEqual(hibernate, conn_proc_mng_policy(0, 0)).
 
 message_queue_too_long_test() ->
-    with_env(
-      [{conn_max_msg_queue_len, 1},
-       {conn_max_total_heap_size, 0}],
-      fun() ->
-          self() ! foo,
-          self() ! bar,
-          ?assertEqual({shutdown, message_queue_too_long},
-                       conn_proc_mng_policy()),
-          receive foo -> ok end,
-          ?assertEqual(continue, conn_proc_mng_policy()),
-          receive bar -> ok end
-      end).
+    self() ! foo,
+    self() ! bar,
+    ?assertEqual({shutdown, message_queue_too_long},
+                 conn_proc_mng_policy(1, 0)),
+    receive foo -> ok end,
+    ?assertEqual(continue, conn_proc_mng_policy(1, 0)),
+    receive bar -> ok end.
 
 total_heap_size_too_large_test() ->
-    with_env(
-      [{conn_max_msg_queue_len, 0},
-       {conn_max_total_heap_size, 1}],
-      fun() ->
-          ?assertEqual({shutdown, total_heap_size_too_large},
-                       conn_proc_mng_policy())
-      end).
+    ?assertEqual({shutdown, total_heap_size_too_large},
+                 conn_proc_mng_policy(0, 1)).
 
-with_env(Envs, F) -> emqx_test_lib:with_env(Envs, F).
-
-conn_proc_mng_policy() -> emqx_misc:conn_proc_mng_policy().
+conn_proc_mng_policy(L, S) ->
+    emqx_misc:conn_proc_mng_policy(#{message_queue_len => L,
+                                     total_heap_size => S}).

+ 1 - 1
test/emqx_session_SUITE.erl

@@ -25,7 +25,7 @@ all() -> [t_session_all].
 init_per_suite(Config) ->
     emqx_ct_broker_helpers:run_setup_steps(),
     Config.
-    
+
 end_per_suite(_Config) ->
     emqx_ct_broker_helpers:run_teardown_steps().
 

+ 0 - 37
test/emqx_test_lib.erl

@@ -1,37 +0,0 @@
-%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-
--module(emqx_test_lib).
-
--export([ with_env/2
-        ]).
-
-with_env([], F) -> F();
-with_env([{Key, Val} | Rest], F) ->
-    Origin = get_env(Key),
-    try
-        ok = set_env(Key, Val),
-        with_env(Rest, F)
-    after
-        case Origin of
-            undefined -> ok = unset_env(Key);
-            _ -> ok = set_env(Key, Origin)
-        end
-    end.
-
-get_env(Key) -> application:get_env(?APPLICATION, Key).
-set_env(Key, Val) -> application:set_env(?APPLICATION, Key, Val).
-unset_env(Key) -> application:unset_env(?APPLICATION, Key).
-
-