فهرست منبع

perf(dssess): avoid periodical last_alive_at bumping in ds sessions

Ilya Averyanov 1 سال پیش
والد
کامیت
712ca70fef

+ 1 - 0
apps/emqx/include/emqx_durable_session_metadata.hrl

@@ -22,6 +22,7 @@
 %% Session metadata keys:
 -define(created_at, created_at).
 -define(last_alive_at, last_alive_at).
+-define(node_epoch_id, node_epoch_id).
 -define(expiry_interval, expiry_interval).
 %% Unique integer used to create unique identities:
 -define(last_id, last_id).

+ 2 - 2
apps/emqx/src/emqx_cm_sup.erl

@@ -52,7 +52,7 @@ init([]) ->
     Registry = child_spec(emqx_cm_registry, 5000, worker),
     RegistryKeeper = child_spec(emqx_cm_registry_keeper, 5000, worker),
     Manager = child_spec(emqx_cm, 5000, worker),
-    DSSessionGCSup = child_spec(emqx_persistent_session_ds_sup, infinity, supervisor),
+    DSSessionSup = child_spec(emqx_persistent_session_ds_sup, infinity, supervisor),
     DSSessionBookkeeper = child_spec(emqx_persistent_session_bookkeeper, 5_000, worker),
     Children =
         [
@@ -63,7 +63,7 @@ init([]) ->
             Registry,
             RegistryKeeper,
             Manager,
-            DSSessionGCSup,
+            DSSessionSup,
             DSSessionBookkeeper
         ],
     {ok, {SupFlags, Children}}.

+ 31 - 15
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -751,14 +751,6 @@ handle_info(Msg, Session, _ClientInfo) ->
 shared_sub_opts(SessionId) ->
     #{session_id => SessionId}.
 
-bump_last_alive(S0) ->
-    %% Note: we take a pessimistic approach here and assume that the client will be alive
-    %% until the next bump timeout.  With this, we avoid garbage collecting this session
-    %% too early in case the session/connection/node crashes earlier without having time
-    %% to commit the time.
-    EstimatedLastAliveAt = now_ms() + bump_interval(),
-    emqx_persistent_session_ds_state:set_last_alive_at(EstimatedLastAliveAt, S0).
-
 -spec replay(clientinfo(), [], session()) ->
     {ok, replies(), session()}.
 replay(ClientInfo, [], Session0 = #{s := S0}) ->
@@ -908,12 +900,17 @@ disconnect(Session = #{id := Id, s := S0, shared_sub_s := SharedSubS0}, ConnInfo
     {shutdown, commit(Session#{s := S, shared_sub_s := SharedSubS})}.
 
 -spec terminate(Reason :: term(), session()) -> ok.
-terminate(_Reason, Session = #{id := Id}) ->
-    maybe_set_will_message_timer(Session),
-    _ = commit(Session),
+terminate(_Reason, Session = #{s := S0, id := Id}) ->
+    _ = maybe_set_will_message_timer(Session),
+    S = finalize_last_alive_at(S0),
+    _ = commit(Session#{s := S}),
     ?tp(debug, persistent_session_ds_terminate, #{id => Id}),
     ok.
 
+finalize_last_alive_at(S0) ->
+    S = emqx_persistent_session_ds_state:set_last_alive_at(now_ms(), S0),
+    emqx_persistent_session_ds_state:set_node_epoch_id(undefined, S).
+
 %%--------------------------------------------------------------------
 %% Management APIs (dashboard)
 %%--------------------------------------------------------------------
@@ -1004,7 +1001,7 @@ session_open(
     case emqx_persistent_session_ds_state:open(SessionId) of
         {ok, S0} ->
             EI = emqx_persistent_session_ds_state:get_expiry_interval(S0),
-            LastAliveAt = emqx_persistent_session_ds_state:get_last_alive_at(S0),
+            LastAliveAt = get_last_alive_at(S0),
             case NowMS >= LastAliveAt + EI of
                 true ->
                     session_drop(SessionId, expired),
@@ -1013,7 +1010,7 @@ session_open(
                     ?tp(open_session, #{ei => EI, now => NowMS, laa => LastAliveAt}),
                     %% New connection being established
                     S1 = emqx_persistent_session_ds_state:set_expiry_interval(EI, S0),
-                    S2 = emqx_persistent_session_ds_state:set_last_alive_at(NowMS, S1),
+                    S2 = init_last_alive_at(NowMS, S1),
                     S3 = emqx_persistent_session_ds_state:set_peername(
                         maps:get(peername, NewConnInfo), S2
                     ),
@@ -1050,6 +1047,26 @@ session_open(
             false
     end.
 
+init_last_alive_at(S) ->
+    init_last_alive_at(now_ms(), S).
+
+init_last_alive_at(NowMs, S0) ->
+    NodeEpochId = emqx_persistent_session_ds_node_heartbeat_worker:get_node_epoch_id(),
+    S1 = emqx_persistent_session_ds_state:set_node_epoch_id(NodeEpochId, S0),
+    emqx_persistent_session_ds_state:set_last_alive_at(NowMs + bump_interval(), S1).
+
+%% NOTE
+%% Here we ignore the case when:
+%% * the session is terminated abnormally, without running terminate callback,
+%% e.g. when the conection was brutally killed;
+%% * but its node and persistent session subsystem remained alive.
+%%
+%% In this case, the session's lifitime is prolonged till the node termination.
+get_last_alive_at(S) ->
+    LastAliveAt = emqx_persistent_session_ds_state:get_last_alive_at(S),
+    NodeEpochId = emqx_persistent_session_ds_state:get_node_epoch_id(S),
+    emqx_persistent_session_ds_gc_worker:session_last_alive_at(LastAliveAt, NodeEpochId).
+
 -spec session_ensure_new(
     id(),
     emqx_types:clientinfo(),
@@ -1065,7 +1082,7 @@ session_ensure_new(
     Now = now_ms(),
     S0 = emqx_persistent_session_ds_state:create_new(Id),
     S1 = emqx_persistent_session_ds_state:set_expiry_interval(expiry_interval(ConnInfo), S0),
-    S2 = bump_last_alive(S1),
+    S2 = init_last_alive_at(S1),
     S3 = emqx_persistent_session_ds_state:set_created_at(Now, S2),
     S4 = lists:foldl(
         fun(Track, Acc) ->
@@ -1097,7 +1114,6 @@ session_ensure_new(
         replay => undefined,
         ?TIMER_PULL => undefined,
         ?TIMER_PUSH => undefined,
-        ?TIMER_BUMP_LAST_ALIVE_AT => undefined,
         ?TIMER_RETRY_REPLAY => undefined,
         ?TIMER_SHARED_SUB => undefined
     }.

+ 21 - 4
apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_gc_worker.erl

@@ -18,8 +18,6 @@
 -behaviour(gen_server).
 
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
--include_lib("stdlib/include/qlc.hrl").
--include_lib("stdlib/include/ms_transform.hrl").
 
 -include("session_internals.hrl").
 
@@ -27,7 +25,8 @@
 -export([
     start_link/0,
     check_session/1,
-    check_session_after/2
+    check_session_after/2,
+    session_last_alive_at/2
 ]).
 
 %% `gen_server' API
@@ -60,6 +59,22 @@ check_session_after(SessionId, Time0) ->
     _ = erlang:send_after(Time, ?MODULE, #check_session{id = SessionId}),
     ok.
 
+-spec session_last_alive_at(
+    pos_integer(), emqx_persistent_session_node_hartbeat_worker:node_epoch_id() | undefined
+) -> pos_integer().
+session_last_alive_at(LastAliveAt, undefined) ->
+    LastAliveAt;
+session_last_alive_at(LastAliveAt, NodeEpochId) ->
+    case emqx_persistent_session_ds_node_heartbeat_worker:get_last_alive_at(NodeEpochId) of
+        undefined ->
+            LastAliveAt;
+        NodeLastAliveAt ->
+            max(
+                LastAliveAt,
+                NodeLastAliveAt + emqx_config:get([durable_sessions, heartbeat_interval])
+            )
+    end.
+
 %%--------------------------------------------------------------------------------
 %% `gen_server' API
 %%--------------------------------------------------------------------------------
@@ -152,11 +167,13 @@ gc_loop(MinLastAlive, It0) ->
 
 do_gc(MinLastAlive, SessionId, Metadata) ->
     #{
-        ?last_alive_at := LastAliveAt,
+        ?last_alive_at := SessionLastAliveAt,
+        ?node_epoch_id := NodeEpochId,
         ?expiry_interval := EI,
         ?will_message := MaybeWillMessage,
         ?clientinfo := ClientInfo
     } = Metadata,
+    LastAliveAt = session_last_alive_at(SessionLastAliveAt, NodeEpochId),
     IsExpired = LastAliveAt + EI < MinLastAlive,
     case
         should_send_will_message(

+ 141 - 0
apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_node_heartbeat_worker.erl

@@ -0,0 +1,141 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_persistent_session_ds_node_heartbeat_worker).
+
+-behaviour(gen_server).
+
+-include("session_internals.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+%% API
+-export([
+    create_tables/0,
+    start_link/0,
+    get_node_epoch_id/0,
+    get_last_alive_at/1
+]).
+
+%% `gen_server' API
+-export([
+    init/1,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    terminate/2
+]).
+
+%% call/cast/info records
+-record(update_last_alive_at, {}).
+
+-define(epoch_id_pt_key, {?MODULE, epoch_id}).
+-define(node_epoch, node_epoch).
+-define(tab, ?node_epoch).
+
+-record(?node_epoch, {
+    epoch_id :: reference(),
+    node :: node(),
+    last_alive_at :: pos_integer()
+}).
+
+-type epoch_id() :: reference().
+
+%%--------------------------------------------------------------------------------
+%% API
+%%--------------------------------------------------------------------------------
+
+-spec start_link() -> {ok, pid()}.
+start_link() ->
+    create_tables(),
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+-spec get_node_epoch_id() -> epoch_id().
+get_node_epoch_id() ->
+    persistent_term:get(?epoch_id_pt_key).
+
+-spec get_last_alive_at(epoch_id()) -> pos_integer() | undefined.
+get_last_alive_at(EpochId) ->
+    case ets:lookup(?tab, EpochId) of
+        [] -> undefined;
+        [#?node_epoch{last_alive_at = LastAliveAt}] -> LastAliveAt
+    end.
+
+%%--------------------------------------------------------------------------------
+%% `gen_server' API
+%%--------------------------------------------------------------------------------
+
+init(_Opts) ->
+    erlang:process_flag(trap_exit, true),
+    ok = generate_node_epoch_id(),
+    ok = update_last_alive_at(),
+    ok = ensure_heartbeat_timer(),
+    State = #{},
+    {ok, State}.
+
+handle_call(_Call, _From, State) ->
+    {reply, {error, not_implemented}, State}.
+
+handle_cast(_Cast, State) ->
+    {noreply, State}.
+
+handle_info(#update_last_alive_at{}, State) ->
+    ok = update_last_alive_at(),
+    ok = ensure_heartbeat_timer(),
+    {noreply, State}.
+
+terminate(_Reason, _State) ->
+    ok = delete_last_alive_at(),
+    ok.
+
+%%--------------------------------------------------------------------------------
+%% Internal functions
+%%--------------------------------------------------------------------------------
+
+create_tables() ->
+    ok = mria:create_table(?tab, [
+        {rlog_shard, ?DS_MRIA_SHARD},
+        {type, set},
+        {storage, disc_copies},
+        {record_name, ?node_epoch},
+        {attributes, record_info(fields, ?node_epoch)}
+    ]),
+    mria:wait_for_tables([?tab]).
+
+generate_node_epoch_id() ->
+    EpochId = erlang:make_ref(),
+    persistent_term:put(?epoch_id_pt_key, EpochId),
+    ok.
+
+ensure_heartbeat_timer() ->
+    _ = erlang:send_after(heartbeat_interval(), self(), #update_last_alive_at{}),
+    ok.
+
+update_last_alive_at() ->
+    EpochId = get_node_epoch_id(),
+    LastAliveAt = now_ms() + heartbeat_interval(),
+    ok = mria:dirty_write(?tab, #?node_epoch{
+        epoch_id = EpochId, node = node(), last_alive_at = LastAliveAt
+    }),
+    ok.
+
+delete_last_alive_at() ->
+    EpochId = get_node_epoch_id(),
+    ok = mria:dirty_delete(?tab, EpochId).
+
+heartbeat_interval() ->
+    emqx_config:get([durable_sessions, heartbeat_interval]).
+
+now_ms() ->
+    erlang:system_time(millisecond).

+ 11 - 0
apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_state.erl

@@ -44,6 +44,7 @@
 ]).
 -export([get_created_at/1, set_created_at/2]).
 -export([get_last_alive_at/1, set_last_alive_at/2]).
+-export([get_node_epoch_id/1, set_node_epoch_id/2]).
 -export([get_expiry_interval/1, set_expiry_interval/2]).
 -export([get_clientinfo/1, set_clientinfo/2]).
 -export([get_will_message/1, set_will_message/2, clear_will_message/1, clear_will_message_now/1]).
@@ -171,6 +172,7 @@
     #{
         ?created_at => emqx_persistent_session_ds:timestamp(),
         ?last_alive_at => emqx_persistent_session_ds:timestamp(),
+        ?node_epoch_id => emqx_persistent_session_ds_node_heartbeat_worker:epoch_id() | undefined,
         ?expiry_interval => non_neg_integer(),
         ?last_id => integer(),
         ?peername => emqx_types:peername(),
@@ -632,6 +634,15 @@ get_last_alive_at(Rec) ->
 set_last_alive_at(Val, Rec) ->
     set_meta(?last_alive_at, Val, Rec).
 
+-spec get_node_epoch_id(t()) ->
+    emqx_persistent_session_ds_node_heartbeat_worker:epoch_id() | undefined.
+get_node_epoch_id(Rec) ->
+    get_meta(?node_epoch_id, Rec).
+
+-spec set_node_epoch_id(emqx_persistent_session_ds:timestamp() | undefined, t()) -> t().
+set_node_epoch_id(Val, Rec) ->
+    set_meta(?node_epoch_id, Val, Rec).
+
 -spec get_expiry_interval(t()) -> non_neg_integer() | undefined.
 get_expiry_interval(Rec) ->
     get_meta(?expiry_interval, Rec).

+ 6 - 3
apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_sup.erl

@@ -53,14 +53,17 @@ do_init(_Opts) ->
         period => 2,
         auto_shutdown => never
     },
-    CoreChildren = [
+    CoreNodeChildren = [
         worker(session_gc_worker, emqx_persistent_session_ds_gc_worker, []),
         worker(message_gc_worker, emqx_persistent_message_ds_gc_worker, [])
     ],
+    AnyNodeChildren = [
+        worker(node_heartbeat, emqx_persistent_session_ds_node_heartbeat_worker, [])
+    ],
     Children =
         case mria_rlog:role() of
-            core -> CoreChildren;
-            replicant -> []
+            core -> CoreNodeChildren ++ AnyNodeChildren;
+            replicant -> AnyNodeChildren
         end,
     {ok, {SupFlags, Children}}.