Jelajahi Sumber

Change from customized total heap size check to set process flag

The `max_heap_size` process flag can be used to limit total
heap size of a process, and it gives much more detailed
crash log if the limit is hit.
spring2maz 7 tahun lalu
induk
melakukan
e3f2ae8db8
5 mengubah file dengan 27 tambahan dan 29 penghapusan
  1. 3 3
      priv/emqx.schema
  2. 1 1
      src/emqx_connection.erl
  3. 16 14
      src/emqx_misc.erl
  4. 1 1
      src/emqx_session.erl
  5. 6 10
      test/emqx_misc_tests.erl

+ 3 - 3
priv/emqx.schema

@@ -835,7 +835,6 @@ end}.
 %% connection/session process.
 %% Message queue here is the Erlang process mailbox, but not the number
 %% of queued MQTT messages of QoS 1 and 2.
-%% Total heap size is the in Erlang 'words' not in 'bytes'.
 %% Zero or negative is to disable.
 {mapping, "zone.$name.force_shutdown_policy", "emqx.zones", [
   {default, "0 | 0MB"},
@@ -868,7 +867,8 @@ end}.
                                    {error, Reason} ->
                                        error(Reason);
                                    Bytes1 ->
-                                       #{bytes => Bytes1, count => list_to_integer(Count)}
+                                       #{bytes => Bytes1,
+                                         count => list_to_integer(Count)}
                                end,
                     {force_gc_policy, GcPolicy};
                ("force_shutdown_policy", Val) ->
@@ -878,7 +878,7 @@ end}.
                                              error(Reason);
                                          Siz1 ->
                                              #{message_queue_len => list_to_integer(Len),
-                                               total_heap_size   => Siz1}
+                                               max_heap_size => Siz1}
                                      end,
                     {force_shutdown_policy, ShutdownPolicy};
                (Opt, Val) ->

+ 1 - 1
src/emqx_connection.erl

@@ -152,7 +152,7 @@ init([Transport, RawSocket, Options]) ->
                                      }),
             GcPolicy = emqx_zone:get_env(Zone, force_gc_policy, false),
             ok = emqx_gc:init(GcPolicy),
-            erlang:put(force_shutdown_policy, emqx_zone:get_env(Zone, force_shutdown_policy)),
+            ok = emqx_misc:init_proc_mng_policy(Zone),
             gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}],
                                   State, self(), IdleTimout);
         {error, Reason} ->

+ 16 - 14
src/emqx_misc.erl

@@ -15,7 +15,9 @@
 -module(emqx_misc).
 
 -export([merge_opts/2, start_timer/2, start_timer/3, cancel_timer/1,
-         proc_name/2, proc_stats/0, proc_stats/1, conn_proc_mng_policy/1]).
+         proc_name/2, proc_stats/0, proc_stats/1]).
+
+-export([init_proc_mng_policy/1, conn_proc_mng_policy/1]).
 
 %% @doc Merge options
 -spec(merge_opts(list(), list()) -> list()).
@@ -60,32 +62,35 @@ proc_stats(Pid) ->
 
 -define(DISABLED, 0).
 
+init_proc_mng_policy(Zone) ->
+    #{max_heap_size := MaxHeapSizeInBytes} = ShutdownPolicy =
+        emqx_zone:get_env(Zone, force_shutdown_policy),
+    MaxHeapSize = MaxHeapSizeInBytes div erlang:system_info(wordsize),
+    _ = erlang:process_flag(max_heap_size, MaxHeapSize), % zero is discarded
+    erlang:put(force_shutdown_policy, ShutdownPolicy),
+    ok.
+
 %% @doc Check self() process status against connection/session process management policy,
 %% return `continue | hibernate | {shutdown, Reason}' accordingly.
 %% `continue': There is nothing out of the ordinary.
 %% `hibernate': Nothing to process in my mailbox, and since this check is triggered
 %%              by a timer, we assume it is a fat chance to continue idel, hence hibernate.
-%% `shutdown': Some numbers (message queue length or heap size have hit the limit),
+%% `shutdown': Some numbers (message queue length hit the limit),
 %%             hence shutdown for greater good (system stability).
--spec(conn_proc_mng_policy(#{message_queue_len := integer(),
-                             total_heap_size := integer()
-                            } | undefined) -> continue | hibernate | {shutdown, _}).
-conn_proc_mng_policy(#{message_queue_len := MaxMsgQueueLen,
-                       total_heap_size := MaxTotalHeapSize
-                      }) ->
+-spec(conn_proc_mng_policy(#{message_queue_len => integer()} | false) ->
+            continue | hibernate | {shutdown, _}).
+conn_proc_mng_policy(#{message_queue_len := MaxMsgQueueLen}) ->
     Qlength = proc_info(message_queue_len),
     Checks =
         [{fun() -> is_message_queue_too_long(Qlength, MaxMsgQueueLen) end,
           {shutdown, message_queue_too_long}},
-         {fun() -> is_heap_size_too_large(MaxTotalHeapSize) end,
-          {shutdown, total_heap_size_too_large}},
          {fun() -> Qlength > 0 end, continue},
          {fun() -> true end, hibernate}
         ],
     check(Checks);
 conn_proc_mng_policy(_) ->
     %% disable by default
-    conn_proc_mng_policy(#{message_queue_len => 0, total_heap_size => 0}).
+    conn_proc_mng_policy(#{message_queue_len => 0}).
 
 check([{Pred, Result} | Rest]) ->
     case Pred() of
@@ -96,9 +101,6 @@ check([{Pred, Result} | Rest]) ->
 is_message_queue_too_long(Qlength, Max) ->
     is_enabled(Max) andalso Qlength > Max.
 
-is_heap_size_too_large(Max) ->
-    is_enabled(Max) andalso proc_info(total_heap_size) > Max.
-
 is_enabled(Max) -> is_integer(Max) andalso Max > ?DISABLED.
 
 proc_info(Key) ->

+ 1 - 1
src/emqx_session.erl

@@ -369,7 +369,7 @@ init([Parent, #{zone                := Zone,
     emqx_hooks:run('session.created', [#{client_id => ClientId}, info(State)]),
     GcPolicy = emqx_zone:get_env(Zone, force_gc_policy, false),
     ok = emqx_gc:init(GcPolicy),
-    erlang:put(force_shutdown_policy, emqx_zone:get_env(Zone, force_shutdown_policy)),
+    ok = emqx_misc:init_proc_mng_policy(Zone),
     ok = proc_lib:init_ack(Parent, {ok, self()}),
     gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}], State).
 

+ 6 - 10
test/emqx_misc_tests.erl

@@ -24,23 +24,19 @@ timer_cancel_flush_test() ->
 
 shutdown_disabled_test() ->
     self() ! foo,
-    ?assertEqual(continue, conn_proc_mng_policy(0, 0)),
+    ?assertEqual(continue, conn_proc_mng_policy(0)),
     receive foo -> ok end,
-    ?assertEqual(hibernate, conn_proc_mng_policy(0, 0)).
+    ?assertEqual(hibernate, conn_proc_mng_policy(0)).
 
 message_queue_too_long_test() ->
     self() ! foo,
     self() ! bar,
     ?assertEqual({shutdown, message_queue_too_long},
-                 conn_proc_mng_policy(1, 0)),
+                 conn_proc_mng_policy(1)),
     receive foo -> ok end,
-    ?assertEqual(continue, conn_proc_mng_policy(1, 0)),
+    ?assertEqual(continue, conn_proc_mng_policy(1)),
     receive bar -> ok end.
 
-total_heap_size_too_large_test() ->
-    ?assertEqual({shutdown, total_heap_size_too_large},
-                 conn_proc_mng_policy(0, 1)).
+conn_proc_mng_policy(L) ->
+    emqx_misc:conn_proc_mng_policy(#{message_queue_len => L}).
 
-conn_proc_mng_policy(L, S) ->
-    emqx_misc:conn_proc_mng_policy(#{message_queue_len => L,
-                                     total_heap_size => S}).