Przeglądaj źródła

Merge pull request #13347 from zhongwencool/improve-check-oom-log

chore: log shutdown reason for check_oom trace log
zhongwencool 1 rok temu
rodzic
commit
2c48d7e0f0

+ 18 - 10
apps/emqx/src/emqx_connection.erl

@@ -564,12 +564,10 @@ handle_msg({Closed, _Sock}, State) when
 handle_msg({Passive, _Sock}, State) when
     Passive == tcp_passive; Passive == ssl_passive; Passive =:= quic_passive
 ->
-    %% In Stats
     Pubs = emqx_pd:reset_counter(incoming_pubs),
     Bytes = emqx_pd:reset_counter(incoming_bytes),
-    InStats = #{cnt => Pubs, oct => Bytes},
     %% Run GC and Check OOM
-    NState1 = check_oom(run_gc(InStats, State)),
+    NState1 = check_oom(Pubs, Bytes, run_gc(Pubs, Bytes, State)),
     handle_info(activate_socket, NState1);
 handle_msg(
     Deliver = {deliver, _Topic, _Msg},
@@ -899,8 +897,7 @@ sent(#state{listener = {Type, Listener}} = State) ->
         true ->
             Pubs = emqx_pd:reset_counter(outgoing_pubs),
             Bytes = emqx_pd:reset_counter(outgoing_bytes),
-            OutStats = #{cnt => Pubs, oct => Bytes},
-            {ok, check_oom(run_gc(OutStats, State))};
+            {ok, check_oom(Pubs, Bytes, run_gc(Pubs, Bytes, State))};
         false ->
             {ok, State}
     end.
@@ -1080,25 +1077,36 @@ retry_limiter(#state{channel = Channel, limiter = Limiter} = State) ->
 %%--------------------------------------------------------------------
 %% Run GC and Check OOM
 
-run_gc(Stats, State = #state{gc_state = GcSt, zone = Zone}) ->
+run_gc(Pubs, Bytes, State = #state{gc_state = GcSt, zone = Zone}) ->
     case
         ?ENABLED(GcSt) andalso not emqx_olp:backoff_gc(Zone) andalso
-            emqx_gc:run(Stats, GcSt)
+            emqx_gc:run(Pubs, Bytes, GcSt)
     of
         false -> State;
         {_IsGC, GcSt1} -> State#state{gc_state = GcSt1}
     end.
 
-check_oom(State = #state{channel = Channel}) ->
+check_oom(Pubs, Bytes, State = #state{channel = Channel}) ->
     ShutdownPolicy = emqx_config:get_zone_conf(
         emqx_channel:info(zone, Channel), [force_shutdown]
     ),
-    ?tp(debug, check_oom, #{policy => ShutdownPolicy}),
     case emqx_utils:check_oom(ShutdownPolicy) of
         {shutdown, Reason} ->
             %% triggers terminate/2 callback immediately
+            ?tp(warning, check_oom_shutdown, #{
+                policy => ShutdownPolicy,
+                incoming_pubs => Pubs,
+                incoming_bytes => Bytes,
+                shutdown => Reason
+            }),
             erlang:exit({shutdown, Reason});
-        _ ->
+        Result ->
+            ?tp(debug, check_oom_ok, #{
+                policy => ShutdownPolicy,
+                incoming_pubs => Pubs,
+                incoming_bytes => Bytes,
+                result => Result
+            }),
             ok
     end,
     State.

+ 1 - 7
apps/emqx/src/emqx_gc.erl

@@ -30,7 +30,6 @@
 
 -export([
     init/1,
-    run/2,
     run/3,
     info/1,
     reset/1
@@ -62,12 +61,7 @@ init(#{count := Count, bytes := Bytes}) ->
     Oct = [{oct, {Bytes, Bytes}} || ?ENABLED(Bytes)],
     ?GCS(maps:from_list(Cnt ++ Oct)).
 
-%% @doc Try to run GC based on reduntions of count or bytes.
--spec run(#{cnt := pos_integer(), oct := pos_integer()}, gc_state()) ->
-    {boolean(), gc_state()}.
-run(#{cnt := Cnt, oct := Oct}, GcSt) ->
-    run(Cnt, Oct, GcSt).
-
+%% @doc Try to run GC based on reductions of count or bytes.
 -spec run(pos_integer(), pos_integer(), gc_state()) ->
     {boolean(), gc_state()}.
 run(Cnt, Oct, ?GCS(St)) ->

+ 7 - 9
apps/emqx/src/emqx_ws_connection.erl

@@ -451,8 +451,8 @@ websocket_info({incoming, Packet}, State) ->
     handle_incoming(Packet, State);
 websocket_info({outgoing, Packets}, State) ->
     return(enqueue(Packets, State));
-websocket_info({check_gc, Stats}, State) ->
-    return(check_oom(run_gc(Stats, State)));
+websocket_info({check_gc, Cnt, Oct}, State) ->
+    return(check_oom(run_gc(Cnt, Oct, State)));
 websocket_info(
     Deliver = {deliver, _Topic, _Msg},
     State = #state{listener = {Type, Listener}}
@@ -691,8 +691,8 @@ when_msg_in(Packets, Msgs, State) ->
 %% Run GC, Check OOM
 %%--------------------------------------------------------------------
 
-run_gc(Stats, State = #state{gc_state = GcSt}) ->
-    case ?ENABLED(GcSt) andalso emqx_gc:run(Stats, GcSt) of
+run_gc(Cnt, Oct, State = #state{gc_state = GcSt}) ->
+    case ?ENABLED(GcSt) andalso emqx_gc:run(Cnt, Oct, GcSt) of
         false -> State;
         {_IsGC, GcSt1} -> State#state{gc_state = GcSt1}
     end.
@@ -805,11 +805,9 @@ handle_outgoing(
                 get_active_n(Type, Listener)
         of
             true ->
-                Stats = #{
-                    cnt => emqx_pd:reset_counter(outgoing_pubs),
-                    oct => emqx_pd:reset_counter(outgoing_bytes)
-                },
-                postpone({check_gc, Stats}, State);
+                Cnt = emqx_pd:reset_counter(outgoing_pubs),
+                Oct = emqx_pd:reset_counter(outgoing_bytes),
+                postpone({check_gc, Cnt, Oct}, State);
             false ->
                 State
         end,

+ 1 - 1
apps/emqx/test/emqx_connection_SUITE.erl

@@ -525,7 +525,7 @@ t_oom_shutdown(_) ->
     with_conn(
         fun(Pid) ->
             Pid ! {tcp_passive, foo},
-            {ok, _} = ?block_until(#{?snk_kind := check_oom}, 1000),
+            {ok, _} = ?block_until(#{?snk_kind := check_oom_shutdown}, 1000),
             {ok, _} = ?block_until(#{?snk_kind := terminate}, 100),
             Trace = snabbkaffe:collect_trace(),
             ?assertEqual(1, length(?of_kind(terminate, Trace))),

+ 1 - 1
apps/emqx/test/emqx_ws_connection_SUITE.erl

@@ -556,7 +556,7 @@ t_handle_outgoing(_) ->
 t_run_gc(_) ->
     GcSt = emqx_gc:init(#{count => 10, bytes => 100}),
     WsSt = st(#{gc_state => GcSt}),
-    ?ws_conn:run_gc(#{cnt => 100, oct => 10000}, WsSt).
+    ?ws_conn:run_gc(100, 10000, WsSt).
 
 t_enqueue(_) ->
     Packet = ?PUBLISH_PACKET(?QOS_0),

+ 6 - 9
apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl

@@ -488,14 +488,12 @@ handle_msg({Closed, _Sock}, State) when
 handle_msg({Passive, _Sock}, State) when
     Passive == tcp_passive; Passive == ssl_passive
 ->
-    %% In Stats
     Bytes = emqx_pd:reset_counter(incoming_bytes),
     Pubs = emqx_pd:reset_counter(incoming_pkt),
-    InStats = #{cnt => Pubs, oct => Bytes},
     %% Ensure Rate Limit
-    NState = ensure_rate_limit(InStats, State),
+    NState = ensure_rate_limit(State),
     %% Run GC and Check OOM
-    NState1 = check_oom(run_gc(InStats, NState)),
+    NState1 = check_oom(run_gc(Pubs, Bytes, NState)),
     handle_info(activate_socket, NState1);
 handle_msg(
     Deliver = {deliver, _Topic, _Msg},
@@ -861,8 +859,7 @@ sent(#state{active_n = ActiveN} = State) ->
         true ->
             Pubs = emqx_pd:reset_counter(outgoing_pkt),
             Bytes = emqx_pd:reset_counter(outgoing_bytes),
-            OutStats = #{cnt => Pubs, oct => Bytes},
-            {ok, check_oom(run_gc(OutStats, State))};
+            {ok, check_oom(run_gc(Pubs, Bytes, State))};
         false ->
             {ok, State}
     end.
@@ -917,14 +914,14 @@ handle_info(Info, State) ->
 %% TODO
 %% Why do we need this?
 %% Why not use the esockd connection limiter (based on emqx_htb_limiter) directly?
-ensure_rate_limit(_Stats, State) ->
+ensure_rate_limit(State) ->
     State.
 
 %%--------------------------------------------------------------------
 %% Run GC and Check OOM
 
-run_gc(Stats, State = #state{gc_state = GcSt}) ->
-    case ?ENABLED(GcSt) andalso emqx_gc:run(Stats, GcSt) of
+run_gc(Pubs, Bytes, State = #state{gc_state = GcSt}) ->
+    case ?ENABLED(GcSt) andalso emqx_gc:run(Pubs, Bytes, GcSt) of
         false -> State;
         {_IsGC, GcSt1} -> State#state{gc_state = GcSt1}
     end.

+ 1 - 1
apps/emqx_gateway_ocpp/src/emqx_gateway_ocpp.app.src

@@ -1,6 +1,6 @@
 {application, emqx_gateway_ocpp, [
     {description, "OCPP-J 1.6 Gateway for EMQX"},
-    {vsn, "0.1.4"},
+    {vsn, "0.1.5"},
     {registered, []},
     {applications, [kernel, stdlib, jesse, emqx, emqx_gateway]},
     {env, []},

+ 12 - 16
apps/emqx_gateway_ocpp/src/emqx_ocpp_connection.erl

@@ -469,20 +469,18 @@ websocket_handle({Frame, _}, State) ->
 websocket_info({call, From, Req}, State) ->
     handle_call(From, Req, State);
 websocket_info({cast, rate_limit}, State) ->
-    Stats = #{
-        cnt => emqx_pd:reset_counter(incoming_pubs),
-        oct => emqx_pd:reset_counter(incoming_bytes)
-    },
-    NState = postpone({check_gc, Stats}, State),
-    return(ensure_rate_limit(Stats, NState));
+    Cnt = emqx_pd:reset_counter(incoming_pubs),
+    Oct = emqx_pd:reset_counter(incoming_bytes),
+    NState = postpone({check_gc, Cnt, Oct}, State),
+    return(ensure_rate_limit(NState));
 websocket_info({cast, Msg}, State) ->
     handle_info(Msg, State);
 websocket_info({incoming, Packet}, State) ->
     handle_incoming(Packet, State);
 websocket_info({outgoing, Packets}, State) ->
     return(enqueue(Packets, State));
-websocket_info({check_gc, Stats}, State) ->
-    return(check_oom(run_gc(Stats, State)));
+websocket_info({check_gc, Cnt, Oct}, State) ->
+    return(check_oom(run_gc(Cnt, Oct, State)));
 websocket_info(
     Deliver = {deliver, _Topic, _Msg},
     State = #state{active_n = ActiveN}
@@ -601,15 +599,15 @@ handle_timeout(TRef, TMsg, State) ->
 %% Ensure rate limit
 %%--------------------------------------------------------------------
 
-ensure_rate_limit(_Stats, State) ->
+ensure_rate_limit(State) ->
     State.
 
 %%--------------------------------------------------------------------
 %% Run GC, Check OOM
 %%--------------------------------------------------------------------
 
-run_gc(Stats, State = #state{gc_state = GcSt}) ->
-    case ?ENABLED(GcSt) andalso emqx_gc:run(Stats, GcSt) of
+run_gc(Cnt, Oct, State = #state{gc_state = GcSt}) ->
+    case ?ENABLED(GcSt) andalso emqx_gc:run(Cnt, Oct, GcSt) of
         false -> State;
         {_IsGC, GcSt1} -> State#state{gc_state = GcSt1}
     end.
@@ -694,11 +692,9 @@ handle_outgoing(Packets, State = #state{active_n = ActiveN, piggyback = Piggybac
     NState =
         case emqx_pd:get_counter(outgoing_pubs) > ActiveN of
             true ->
-                Stats = #{
-                    cnt => emqx_pd:reset_counter(outgoing_pubs),
-                    oct => emqx_pd:reset_counter(outgoing_bytes)
-                },
-                postpone({check_gc, Stats}, State);
+                Cnt = emqx_pd:reset_counter(outgoing_pubs),
+                Oct = emqx_pd:reset_counter(outgoing_bytes),
+                postpone({check_gc, Cnt, Oct}, State);
             false ->
                 State
         end,