Просмотр исходного кода

chore(emqx_connection): Add a test case to cover oom kill

Zaiming Shi 5 лет назад
Родитель
Сommit
6f5aa88562
2 измененных файлов с 47 добавлено и 3 удалено
  1. 3 1
      src/emqx_connection.erl
  2. 44 2
      test/emqx_connection_SUITE.erl

+ 3 - 1
src/emqx_connection.erl

@@ -21,6 +21,7 @@
 -include("emqx_mqtt.hrl").
 -include("emqx_mqtt.hrl").
 -include("logger.hrl").
 -include("logger.hrl").
 -include("types.hrl").
 -include("types.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 
 -logger_header("[MQTT]").
 -logger_header("[MQTT]").
 
 
@@ -449,7 +450,7 @@ handle_msg(Msg, State) ->
 -spec terminate(any(), state()) -> no_return().
 -spec terminate(any(), state()) -> no_return().
 terminate(Reason, State = #state{channel = Channel, transport = Transport,
 terminate(Reason, State = #state{channel = Channel, transport = Transport,
           socket = Socket}) ->
           socket = Socket}) ->
-    ?LOG(debug, "Terminated due to ~p", [Reason]),
+    ?tp(debug, terminate, #{reason => Reason}),
     Channel1 = emqx_channel:set_conn_state(disconnected, Channel),
     Channel1 = emqx_channel:set_conn_state(disconnected, Channel),
     emqx_congestion:cancel_alarms(Socket, Transport, Channel1),
     emqx_congestion:cancel_alarms(Socket, Transport, Channel1),
     emqx_channel:terminate(Reason, Channel1),
     emqx_channel:terminate(Reason, Channel1),
@@ -686,6 +687,7 @@ run_gc(Stats, State = #state{gc_state = GcSt}) ->
 check_oom(State = #state{channel = Channel}) ->
 check_oom(State = #state{channel = Channel}) ->
     Zone = emqx_channel:info(zone, Channel),
     Zone = emqx_channel:info(zone, Channel),
     OomPolicy = emqx_zone:oom_policy(Zone),
     OomPolicy = emqx_zone:oom_policy(Zone),
+    ?tp(debug, check_oom, #{policy => OomPolicy}),
     case ?ENABLED(OomPolicy) andalso emqx_misc:check_oom(OomPolicy) of
     case ?ENABLED(OomPolicy) andalso emqx_misc:check_oom(OomPolicy) of
         {shutdown, Reason} ->
         {shutdown, Reason} ->
             %% triggers terminate/2 callback immediately
             %% triggers terminate/2 callback immediately

+ 44 - 2
test/emqx_connection_SUITE.erl

@@ -21,6 +21,7 @@
 
 
 -include_lib("emqx/include/emqx_mqtt.hrl").
 -include_lib("emqx/include/emqx_mqtt.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("eunit/include/eunit.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 
 all() -> emqx_ct:all(?MODULE).
 all() -> emqx_ct:all(?MODULE).
 
 
@@ -88,11 +89,18 @@ init_per_testcase(TestCase, Config) when
     ok = meck:expect(emqx_transport, async_send, fun(_Sock, _Data) -> ok end),
     ok = meck:expect(emqx_transport, async_send, fun(_Sock, _Data) -> ok end),
     ok = meck:expect(emqx_transport, async_send, fun(_Sock, _Data, _Opts) -> ok end),
     ok = meck:expect(emqx_transport, async_send, fun(_Sock, _Data, _Opts) -> ok end),
     ok = meck:expect(emqx_transport, fast_close, fun(_Sock) -> ok end),
     ok = meck:expect(emqx_transport, fast_close, fun(_Sock) -> ok end),
-    Config;
+    case erlang:function_exported(?MODULE, TestCase, 2) of
+        true -> ?MODULE:TestCase(init, Config);
+        _ -> Config
+    end;
 init_per_testcase(_, Config) ->
 init_per_testcase(_, Config) ->
     Config.
     Config.
 
 
-end_per_testcase(_TestCase, Config) ->
+end_per_testcase(TestCase, Config) ->
+    case erlang:function_exported(?MODULE, TestCase, 2) of
+        true -> ?MODULE:TestCase('end', Config);
+        false -> ok
+    end,
     Config.
     Config.
 
 
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
@@ -386,6 +394,40 @@ t_get_conn_info(_) ->
                                     }, SockInfo)
                                     }, SockInfo)
               end).
               end).
 
 
+t_oom_shutdown(init, Config) ->
+    ok = snabbkaffe:start_trace(),
+    ok = meck:new(emqx_misc, [non_strict, passthrough, no_history, no_link]),
+    ok = meck:new(emqx_zone, [non_strict, passthrough, no_history, no_link]),
+    meck:expect(emqx_zone, oom_policy,
+                fun(_Zone) -> #{message_queue_len => 10, max_heap_size => 8000000} end),
+    meck:expect(emqx_misc, check_oom,
+                fun(_) -> {shutdown, "fake_oom"} end),
+    Config;
+t_oom_shutdown('end', _Config) ->
+    snabbkaffe:stop(),
+    meck:unload(emqx_misc),
+    meck:unload(emqx_zone),
+    ok.
+
+t_oom_shutdown(_) ->
+    Opts = #{trap_exit => true},
+    with_conn(
+      fun(Pid) ->
+              Pid ! {tcp_passive, foo},
+              ?block_until(#{?snk_kind := check_oom}, 100),
+              ?block_until(#{?snk_kind := terminate}, 10),
+              Trace = snabbkaffe:collect_trace(),
+              ?assertEqual(1, length(?of_kind(terminate, Trace))),
+              receive
+                  {'EXIT', Pid, Reason} ->
+                      ?assertEqual({shutdown, "fake_oom"}, Reason)
+              after 1000 ->
+                        error(timeout)
+              end,
+              ?assertNot(erlang:is_process_alive(Pid))
+      end, Opts),
+    ok.
+
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Helper functions
 %% Helper functions
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------