Quellcode durchsuchen

fix(conn): resume stats timer in on `connected` event instead

Also don't fire zero-timeout timer because `connected` event handling
code already takes care of initialization the channel metrics.
Andrew Mayorov vor 1 Jahr
Ursprung
Commit
c74f7dfa11

+ 6 - 12
apps/emqx/src/emqx_connection.erl

@@ -454,22 +454,17 @@ wakeup_from_hib(Parent, State) ->
 %%--------------------------------------------------------------------
 %% Ensure/cancel stats timer
 
--compile({inline, [ensure_stats_timer/1, ensure_stats_timer/2]}).
+-compile({inline, [ensure_stats_timer/1]}).
 ensure_stats_timer(State = #state{stats_timer = undefined}) ->
     Timeout = get_zone_idle_timeout(State#state.zone),
-    ensure_stats_timer(Timeout, State);
-ensure_stats_timer(State) ->
-    State.
-
-ensure_stats_timer(Timeout, State = #state{stats_timer = undefined}) ->
     State#state{stats_timer = start_timer(Timeout, emit_stats)};
-ensure_stats_timer(_Timeout, State) ->
+ensure_stats_timer(State) ->
     %% Either already active, disabled, or paused.
     State.
 
 -compile({inline, [resume_stats_timer/1]}).
 resume_stats_timer(State = #state{stats_timer = paused}) ->
-    ensure_stats_timer(0, State#state{stats_timer = undefined});
+    State#state{stats_timer = undefined};
 resume_stats_timer(State = #state{stats_timer = disabled}) ->
     State.
 
@@ -569,9 +564,7 @@ handle_msg(
         serialize = emqx_frame:serialize_opts(ConnPkt),
         idle_timer = undefined
     },
-    %% Causes stats timer to be emitted (if enabled) right after CONNECT is processed.
-    FState = resume_stats_timer(NState),
-    handle_incoming(Packet, FState);
+    handle_incoming(Packet, NState);
 handle_msg({incoming, Packet}, State) ->
     ?TRACE("MQTT", "mqtt_packet_received", #{packet => Packet}),
     handle_incoming(Packet, State);
@@ -622,7 +615,8 @@ handle_msg(
             maps:get(conn_pid, QSS), {PS, Serialize, Channel}
         ),
     ClientId = emqx_channel:info(clientid, Channel),
-    emqx_cm:insert_channel_info(ClientId, info(State), stats(State));
+    emqx_cm:insert_channel_info(ClientId, info(State), stats(State)),
+    {ok, resume_stats_timer(State)};
 handle_msg({event, disconnected}, State = #state{channel = Channel}) ->
     ClientId = emqx_channel:info(clientid, Channel),
     emqx_cm:set_chan_info(ClientId, info(State)),

+ 3 - 3
apps/emqx/test/emqx_connection_SUITE.erl

@@ -188,10 +188,10 @@ t_process_msg(_) ->
     ).
 
 t_ensure_stats_timer(_) ->
-    NStats = emqx_connection:ensure_stats_timer(100, st(#{stats_timer => undefined})),
+    NStats = emqx_connection:ensure_stats_timer(st(#{stats_timer => undefined})),
     StatsTimer = emqx_connection:info(stats_timer, NStats),
     ?assert(is_reference(StatsTimer)),
-    ?assertEqual(NStats, emqx_connection:ensure_stats_timer(100, NStats)).
+    ?assertEqual(NStats, emqx_connection:ensure_stats_timer(NStats)).
 
 t_cancel_stats_timer(_) ->
     NStats = emqx_connection:cancel_stats_timer(st(#{stats_timer => make_ref()})),
@@ -272,7 +272,7 @@ t_handle_msg_event(_) ->
     ok = meck:expect(emqx_cm, register_channel, fun(_, _, _) -> ok end),
     ok = meck:expect(emqx_cm, insert_channel_info, fun(_, _, _) -> ok end),
     ok = meck:expect(emqx_cm, set_chan_info, fun(_, _) -> ok end),
-    ?assertEqual(ok, handle_msg({event, connected}, st())),
+    ?assertMatch({ok, _St}, handle_msg({event, connected}, st())),
     ?assertMatch({ok, _St}, handle_msg({event, disconnected}, st())),
     ?assertMatch({ok, _St}, handle_msg({event, undefined}, st())).
 

+ 7 - 2
apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl

@@ -384,11 +384,16 @@ t_connect_emit_stats_timeout('end', _Config) ->
 
 t_connect_emit_stats_timeout(Config) ->
     ConnFun = ?config(conn_fun, Config),
-    {_, IdleTimeout} = lists:keyfind(idle_timeout, 1, Config),
+    IdleTimeout = ?config(idle_timeout, Config),
     {ok, Client} = emqtt:start_link([{proto_ver, v5}, {keepalive, 60} | Config]),
     {ok, _} = emqtt:ConnFun(Client),
+    %% Poke the connection to ensure stats timer is armed.
+    pong = emqtt:ping(Client),
     [ClientPid] = emqx_cm:lookup_channels(client_info(clientid, Client)),
-    ?assert(is_reference(emqx_connection:info(stats_timer, sys:get_state(ClientPid)))),
+    ?assertMatch(
+        TRef when is_reference(TRef),
+        emqx_connection:info(stats_timer, sys:get_state(ClientPid))
+    ),
     ?block_until(#{?snk_kind := cancel_stats_timer}, IdleTimeout * 2, _BackInTime = 0),
     ?assertEqual(undefined, emqx_connection:info(stats_timer, sys:get_state(ClientPid))),
     ok = emqtt:disconnect(Client).