Просмотр исходного кода

fix(conn): postpone arming stats timer until CONNECT is processed

Andrew Mayorov 1 год назад
Родитель
Сommit
c76970f6ed
2 измененных файлов с 45 добавлено и 32 удалено
  1. 40 27
      apps/emqx/src/emqx_connection.erl
  2. 5 5
      apps/emqx/test/emqx_connection_SUITE.erl

+ 40 - 27
apps/emqx/src/emqx_connection.erl

@@ -101,12 +101,13 @@
     %% GC State
     gc_state :: option(emqx_gc:gc_state()),
     %% Stats Timer
-    stats_timer :: disabled | option(reference()),
-    %% Idle Timeout
-    idle_timeout :: integer() | infinity,
+    %% When `disabled` stats are never reported.
+    %% When `paused` stats are not reported until complete CONNECT packet received.
+    %% Connection starts with `paused` by default.
+    stats_timer :: disabled | paused | option(reference()),
     %% Idle Timer
     idle_timer :: option(reference()),
-    %% Idle Timeout
+    %% Hibernate connection process if inactive for
     hibernate_after :: integer() | infinity,
     %% Zone name
     zone :: atom(),
@@ -341,10 +342,9 @@ init_state(
         end,
     StatsTimer =
         case emqx_config:get_zone_conf(Zone, [stats, enable]) of
-            true -> undefined;
+            true -> paused;
             false -> disabled
         end,
-    IdleTimeout = emqx_channel:get_mqtt_conf(Zone, idle_timeout),
 
     #state{
         transport = Transport,
@@ -358,8 +358,7 @@ init_state(
         channel = Channel,
         gc_state = GcState,
         stats_timer = StatsTimer,
-        idle_timeout = IdleTimeout,
-        hibernate_after = maps:get(hibernate_after, Opts, IdleTimeout),
+        hibernate_after = maps:get(hibernate_after, Opts, get_zone_idle_timeout(Zone)),
         zone = Zone,
         listener = Listener,
         limiter_buffer = queue:new(),
@@ -374,20 +373,17 @@ run_loop(
         transport = Transport,
         socket = Socket,
         peername = Peername,
-        channel = Channel,
         listener = Listener,
-        idle_timeout = IdleTimeout
+        zone = Zone
     }
 ) ->
     emqx_logger:set_metadata_peername(esockd:format(Peername)),
-    ShutdownPolicy = emqx_config:get_zone_conf(
-        emqx_channel:info(zone, Channel),
-        [force_shutdown]
-    ),
+    ShutdownPolicy = emqx_config:get_zone_conf(Zone, [force_shutdown]),
     emqx_utils:tune_heap_size(ShutdownPolicy),
     case activate_socket(State) of
         {ok, NState} ->
             ok = set_tcp_keepalive(Listener),
+            IdleTimeout = get_zone_idle_timeout(Zone),
             IdleTimer = start_timer(IdleTimeout, idle_timeout),
             hibernate(Parent, NState#state{idle_timer = IdleTimer});
         {error, Reason} ->
@@ -430,7 +426,7 @@ recvloop(
             true ->
                 recvloop(Parent, State);
             false ->
-                _ = set_chan_stats(State),
+                _ = try_set_chan_stats(State),
                 hibernate(Parent, cancel_stats_timer(State))
         end
     end.
@@ -440,8 +436,8 @@ handle_recv({system, From, Request}, Parent, State) ->
 handle_recv({'EXIT', Parent, Reason}, Parent, State) ->
     %% FIXME: it's not trapping exit, should never receive an EXIT
     terminate(Reason, State);
-handle_recv(Msg, Parent, State = #state{idle_timeout = IdleTimeout}) ->
-    case process_msg([Msg], ensure_stats_timer(IdleTimeout, State)) of
+handle_recv(Msg, Parent, State) ->
+    case process_msg([Msg], ensure_stats_timer(State)) of
         {ok, NewState} ->
             ?MODULE:recvloop(Parent, NewState);
         {stop, Reason, NewSate} ->
@@ -458,10 +454,23 @@ wakeup_from_hib(Parent, State) ->
 %%--------------------------------------------------------------------
 %% Ensure/cancel stats timer
 
--compile({inline, [ensure_stats_timer/2]}).
+-compile({inline, [ensure_stats_timer/1, ensure_stats_timer/2]}).
+ensure_stats_timer(State = #state{stats_timer = undefined}) ->
+    Timeout = get_zone_idle_timeout(State#state.zone),
+    ensure_stats_timer(Timeout, State);
+ensure_stats_timer(State) ->
+    State.
+
 ensure_stats_timer(Timeout, State = #state{stats_timer = undefined}) ->
     State#state{stats_timer = start_timer(Timeout, emit_stats)};
 ensure_stats_timer(_Timeout, State) ->
+    %% Either already active, disabled, or paused.
+    State.
+
+-compile({inline, [resume_stats_timer/1]}).
+resume_stats_timer(State = #state{stats_timer = paused}) ->
+    ensure_stats_timer(0, State#state{stats_timer = undefined});
+resume_stats_timer(State = #state{stats_timer = disabled}) ->
     State.
 
 -compile({inline, [cancel_stats_timer/1]}).
@@ -472,6 +481,10 @@ cancel_stats_timer(State = #state{stats_timer = TRef}) when is_reference(TRef) -
 cancel_stats_timer(State) ->
     State.
 
+-compile({inline, [get_zone_idle_timeout/1]}).
+get_zone_idle_timeout(Zone) ->
+    emqx_channel:get_mqtt_conf(Zone, idle_timeout).
+
 %%--------------------------------------------------------------------
 %% Process next Msg
 
@@ -552,12 +565,13 @@ handle_msg(
     State = #state{idle_timer = IdleTimer}
 ) ->
     ok = emqx_utils:cancel_timer(IdleTimer),
-    Serialize = emqx_frame:serialize_opts(ConnPkt),
     NState = State#state{
-        serialize = Serialize,
+        serialize = emqx_frame:serialize_opts(ConnPkt),
         idle_timer = undefined
     },
-    handle_incoming(Packet, NState);
+    %% Causes stats timer to be emitted (if enabled) right after CONNECT is processed.
+    FState = resume_stats_timer(NState),
+    handle_incoming(Packet, FState);
 handle_msg({incoming, Packet}, State) ->
     ?TRACE("MQTT", "mqtt_packet_received", #{packet => Packet}),
     handle_incoming(Packet, State);
@@ -724,7 +738,8 @@ handle_timeout(
         socket = Socket
     }
 ) ->
-    _ = set_chan_stats(State),
+    ClientId = emqx_channel:info(clientid, Channel),
+    emqx_cm:set_chan_stats(ClientId, stats(State)),
     emqx_congestion:maybe_alarm_conn_congestion(Socket, Transport, Channel),
     {ok, State#state{stats_timer = undefined}};
 handle_timeout(
@@ -743,7 +758,7 @@ handle_timeout(
 handle_timeout(TRef, Msg, State) ->
     with_channel(handle_timeout, [TRef, Msg], State).
 
-set_chan_stats(State = #state{channel = Channel}) ->
+try_set_chan_stats(State = #state{channel = Channel}) ->
     case emqx_channel:info(clientid, Channel) of
         %% ClientID is not yet known, nothing to report.
         undefined -> false;
@@ -1111,10 +1126,8 @@ run_gc(Pubs, Bytes, State = #state{gc_state = GcSt, zone = Zone}) ->
         {_IsGC, GcSt1} -> State#state{gc_state = GcSt1}
     end.
 
-check_oom(Pubs, Bytes, State = #state{channel = Channel}) ->
-    ShutdownPolicy = emqx_config:get_zone_conf(
-        emqx_channel:info(zone, Channel), [force_shutdown]
-    ),
+check_oom(Pubs, Bytes, State = #state{zone = Zone}) ->
+    ShutdownPolicy = emqx_config:get_zone_conf(Zone, [force_shutdown]),
     case emqx_utils:check_oom(ShutdownPolicy) of
         {shutdown, Reason} ->
             %% triggers terminate/2 callback immediately

+ 5 - 5
apps/emqx/test/emqx_connection_SUITE.erl

@@ -188,15 +188,15 @@ t_process_msg(_) ->
     ).
 
 t_ensure_stats_timer(_) ->
-    NStats = emqx_connection:ensure_stats_timer(100, st()),
-    Stats_timer = emqx_connection:info(stats_timer, NStats),
-    ?assert(is_reference(Stats_timer)),
+    NStats = emqx_connection:ensure_stats_timer(100, st(#{stats_timer => undefined})),
+    StatsTimer = emqx_connection:info(stats_timer, NStats),
+    ?assert(is_reference(StatsTimer)),
     ?assertEqual(NStats, emqx_connection:ensure_stats_timer(100, NStats)).
 
 t_cancel_stats_timer(_) ->
     NStats = emqx_connection:cancel_stats_timer(st(#{stats_timer => make_ref()})),
-    Stats_timer = emqx_connection:info(stats_timer, NStats),
-    ?assertEqual(undefined, Stats_timer),
+    StatsTimer = emqx_connection:info(stats_timer, NStats),
+    ?assertEqual(undefined, StatsTimer),
     ?assertEqual(NStats, emqx_connection:cancel_stats_timer(NStats)).
 
 t_append_msg(_) ->