Bläddra i källkod

Merge pull request #10744 from savonarola/0518-fix-eviction

fix(evacuation): handle expire interval correctly
Zaiming (Stone) Shi 2 år sedan
förälder
incheckning
2fdf4b5dac

+ 2 - 0
apps/emqx/include/emqx_channel.hrl

@@ -40,3 +40,5 @@
     session,
     will_msg
 ]).
+
+-define(EXPIRE_INTERVAL_INFINITE, 4294967295000).

+ 1 - 1
apps/emqx/src/emqx_channel.erl

@@ -2079,7 +2079,7 @@ maybe_resume_session(#channel{
 
 maybe_shutdown(Reason, Channel = #channel{conninfo = ConnInfo}) ->
     case maps:get(expiry_interval, ConnInfo) of
-        ?UINT_MAX ->
+        ?EXPIRE_INTERVAL_INFINITE ->
             {ok, Channel};
         I when I > 0 ->
             {ok, ensure_timer(expire_timer, I, Channel)};

+ 1 - 0
apps/emqx/src/emqx_cm.erl

@@ -773,6 +773,7 @@ mark_channel_connected(ChanPid) ->
 mark_channel_disconnected(ChanPid) ->
     ?tp(emqx_cm_connected_client_count_dec, #{chan_pid => ChanPid}),
     ets:delete(?CHAN_LIVE_TAB, ChanPid),
+    ?tp(emqx_cm_connected_client_count_dec_done, #{chan_pid => ChanPid}),
     ok.
 
 get_connected_client_count() ->

+ 2 - 4
apps/emqx/src/persistent_session/emqx_persistent_session.erl

@@ -60,14 +60,12 @@
 -export_type([sess_msg_key/0]).
 
 -include("emqx.hrl").
+-include("emqx_channel.hrl").
 -include("emqx_persistent_session.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 -compile({inline, [is_store_enabled/0]}).
 
-%% 16#FFFFFFFF * 1000
--define(MAX_EXPIRY_INTERVAL, 4294967295000).
-
 %% NOTE: Order is significant because of traversal order of the table.
 -define(MARKER, 3).
 -define(DELIVERED, 2).
@@ -424,7 +422,7 @@ pending(SessionID, MarkerIds) ->
 %% @private [MQTT-3.1.2-23]
 persistent_session_status(#session_store{expiry_interval = 0}) ->
     not_persistent;
-persistent_session_status(#session_store{expiry_interval = ?MAX_EXPIRY_INTERVAL}) ->
+persistent_session_status(#session_store{expiry_interval = ?EXPIRE_INTERVAL_INFINITE}) ->
     persistent;
 persistent_session_status(#session_store{expiry_interval = E, ts = TS}) ->
     case E + TS > erlang:system_time(millisecond) of

+ 1 - 1
apps/emqx_eviction_agent/src/emqx_eviction_agent.app.src

@@ -1,6 +1,6 @@
 {application, emqx_eviction_agent, [
     {description, "EMQX Eviction Agent"},
-    {vsn, "5.0.0"},
+    {vsn, "5.0.1"},
     {registered, [
         emqx_eviction_agent_sup,
         emqx_eviction_agent,

+ 2 - 2
apps/emqx_eviction_agent/src/emqx_eviction_agent_channel.erl

@@ -218,10 +218,10 @@ cancel_expiry_timer(_) ->
 
 set_expiry_timer(#{conninfo := ConnInfo} = Channel) ->
     case maps:get(expiry_interval, ConnInfo) of
-        ?UINT_MAX ->
+        ?EXPIRE_INTERVAL_INFINITE ->
             {ok, Channel};
         I when I > 0 ->
-            Timer = erlang:send_after(timer:seconds(I), self(), expire_session),
+            Timer = erlang:send_after(I, self(), expire_session),
             {ok, Channel#{expiry_timer => Timer}};
         _ ->
             {error, should_be_expired}

+ 3 - 3
apps/emqx_eviction_agent/test/emqx_eviction_agent_SUITE.erl

@@ -177,7 +177,7 @@ t_explicit_session_takeover(Config) ->
                 ?assert(false, "Connection not evicted")
             end
         end,
-        #{?snk_kind := emqx_cm_connected_client_count_dec, chan_pid := ChanPid},
+        #{?snk_kind := emqx_cm_connected_client_count_dec_done, chan_pid := ChanPid},
         2000
     ),
 
@@ -383,7 +383,7 @@ t_ws_conn(_Config) ->
 
     ?assertWaitEvent(
         ok = emqx_eviction_agent:evict_connections(1),
-        #{?snk_kind := emqx_cm_connected_client_count_dec},
+        #{?snk_kind := emqx_cm_connected_client_count_dec_done},
         1000
     ),
 
@@ -418,7 +418,7 @@ t_quic_conn(_Config) ->
 
     ?assertWaitEvent(
         ok = emqx_eviction_agent:evict_connections(1),
-        #{?snk_kind := emqx_cm_connected_client_count_dec},
+        #{?snk_kind := emqx_cm_connected_client_count_dec_done},
         1000
     ),
 

+ 2 - 1
apps/emqx_eviction_agent/test/emqx_eviction_agent_channel_SUITE.erl

@@ -10,6 +10,7 @@
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
 -include_lib("emqx/include/emqx_mqtt.hrl").
+-include_lib("emqx/include/emqx_channel.hrl").
 
 -define(CLIENT_ID, <<"client_with_session">>).
 
@@ -101,7 +102,7 @@ t_start_infinite_expire(_Config) ->
         conninfo => #{
             clientid => ?CLIENT_ID,
             receive_maximum => 32,
-            expiry_interval => ?UINT_MAX
+            expiry_interval => ?EXPIRE_INTERVAL_INFINITE
         }
     },
     ?assertMatch(