Browse Source

feat: store session unregistration timestamp in emqx_cm_registry table

Zaiming (Stone) Shi 2 năm trước cách đây
mục cha
commit
e9318752e6

+ 8 - 1
apps/emqx/include/emqx_cm.hrl

@@ -23,7 +23,7 @@
 -define(CHAN_INFO_TAB, emqx_channel_info).
 -define(CHAN_LIVE_TAB, emqx_channel_live).
 
-%% Mria/Mnesia Tables for channel management.
+%% Mria table for session registraition.
 -define(CHAN_REG_TAB, emqx_channel_registry).
 
 -define(T_KICK, 5_000).
@@ -32,4 +32,11 @@
 
 -define(CM_POOL, emqx_cm_pool).
 
+%% Registered sessions.
+-record(channel, {
+    chid :: emqx_types:clientid() | '_',
+    %% pid field is extended in 5.6.0 to support recording unregistration timestamp.
+    pid :: pid() | non_neg_integer() | '$1'
+}).
+
 -endif.

+ 132 - 18
apps/emqx/src/emqx_cm_registry.erl

@@ -1,5 +1,5 @@
 %%--------------------------------------------------------------------
-%% Copyright (c) 2019-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%% Copyright (c) 2019-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%
 %% Licensed under the Apache License, Version 2.0 (the "License");
 %% you may not use this file except in compliance with the License.
@@ -19,14 +19,9 @@
 
 -behaviour(gen_server).
 
--include("emqx.hrl").
--include("emqx_cm.hrl").
--include("logger.hrl").
--include("types.hrl").
-
 -export([start_link/0]).
 
--export([is_enabled/0]).
+-export([is_enabled/0, is_hist_enabled/0]).
 
 -export([
     register_channel/1,
@@ -50,10 +45,13 @@
     do_cleanup_channels/1
 ]).
 
--define(REGISTRY, ?MODULE).
--define(LOCK, {?MODULE, cleanup_down}).
+-include("emqx.hrl").
+-include("emqx_cm.hrl").
+-include("logger.hrl").
+-include("types.hrl").
 
--record(channel, {chid, pid}).
+-define(REGISTRY, ?MODULE).
+-define(NODE_DOWN_CLEANUP_LOCK, {?MODULE, cleanup_down}).
 
 %% @doc Start the global channel registry.
 -spec start_link() -> startlink_ret().
@@ -69,6 +67,11 @@ start_link() ->
 is_enabled() ->
     emqx:get_config([broker, enable_session_registry]).
 
+%% @doc Is the global session registration history enabled?
+-spec is_hist_enabled() -> boolean().
+is_hist_enabled() ->
+    retain_duration() > 0.
+
 %% @doc Register a global channel.
 -spec register_channel(
     emqx_types:clientid()
@@ -78,8 +81,11 @@ register_channel(ClientId) when is_binary(ClientId) ->
     register_channel({ClientId, self()});
 register_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) ->
     case is_enabled() of
-        true -> mria:dirty_write(?CHAN_REG_TAB, record(ClientId, ChanPid));
-        false -> ok
+        true ->
+            ok = when_hist_enabled(fun() -> delete_hist_d(ClientId) end),
+            mria:dirty_write(?CHAN_REG_TAB, record(ClientId, ChanPid));
+        false ->
+            ok
     end.
 
 %% @doc Unregister a global channel.
@@ -91,18 +97,45 @@ unregister_channel(ClientId) when is_binary(ClientId) ->
     unregister_channel({ClientId, self()});
 unregister_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) ->
     case is_enabled() of
-        true -> mria:dirty_delete_object(?CHAN_REG_TAB, record(ClientId, ChanPid));
-        false -> ok
+        true ->
+            mria:dirty_delete_object(?CHAN_REG_TAB, record(ClientId, ChanPid)),
+            %% insert unregistration history after unrestration
+            ok = when_hist_enabled(fun() -> insert_hist_d(ClientId) end);
+        false ->
+            ok
     end.
 
 %% @doc Lookup the global channels.
 -spec lookup_channels(emqx_types:clientid()) -> list(pid()).
 lookup_channels(ClientId) ->
-    [ChanPid || #channel{pid = ChanPid} <- mnesia:dirty_read(?CHAN_REG_TAB, ClientId)].
+    lists:filtermap(
+        fun
+            (#channel{pid = ChanPid}) when is_pid(ChanPid) ->
+                case is_pid_down(ChanPid) of
+                    true ->
+                        false;
+                    _ ->
+                        {true, ChanPid}
+                end;
+            (_) ->
+                false
+        end,
+        mnesia:dirty_read(?CHAN_REG_TAB, ClientId)
+    ).
+
+%% Return 'true' or 'false' if it's a local pid.
+%% Otherwise return 'unknown'.
+is_pid_down(Pid) when node(Pid) =:= node() ->
+    not erlang:is_process_alive(Pid);
+is_pid_down(_) ->
+    unknown.
 
 record(ClientId, ChanPid) ->
     #channel{chid = ClientId, pid = ChanPid}.
 
+hist(ClientId) ->
+    #channel{chid = ClientId, pid = now_ts()}.
+
 %%--------------------------------------------------------------------
 %% gen_server callbacks
 %%--------------------------------------------------------------------
@@ -158,15 +191,96 @@ code_change(_OldVsn, State, _Extra) ->
 
 cleanup_channels(Node) ->
     global:trans(
-        {?LOCK, self()},
+        {?NODE_DOWN_CLEANUP_LOCK, self()},
         fun() ->
             mria:transaction(?CM_SHARD, fun ?MODULE:do_cleanup_channels/1, [Node])
         end
     ).
 
 do_cleanup_channels(Node) ->
-    Pat = [{#channel{pid = '$1', _ = '_'}, [{'==', {node, '$1'}, Node}], ['$_']}],
+    Pat = [
+        {
+            #channel{pid = '$1', _ = '_'},
+            _Match = [{'andalso', {is_pid, '$1'}, {'==', {node, '$1'}, Node}}],
+            _Return = ['$_']
+        }
+    ],
     lists:foreach(fun delete_channel/1, mnesia:select(?CHAN_REG_TAB, Pat, write)).
 
 delete_channel(Chan) ->
-    mnesia:delete_object(?CHAN_REG_TAB, Chan, write).
+    mnesia:delete_object(?CHAN_REG_TAB, Chan, write),
+    ok = when_hist_enabled(fun() -> insert_hist_t(Chan#channel.chid) end).
+
+%%--------------------------------------------------------------------
+%% History entry operations
+%%--------------------------------------------------------------------
+
+when_hist_enabled(F) ->
+    case is_hist_enabled() of
+        true ->
+            _ = F();
+        false ->
+            ok
+    end,
+    ok.
+
+%% Insert unregistration history in a transaction when unregistering the last channel for a clientid.
+insert_hist_t(ClientId) ->
+    case delete_hist_t(ClientId) of
+        true ->
+            ok;
+        false ->
+            mnesia:write(?CHAN_REG_TAB, hist(ClientId), write)
+    end.
+
+%% Dirty insert unregistration history.
+%% Since dirty opts are used, async pool workers may race deletes and inserts,
+%% so there could be more than one history records for a clientid,
+%% but it should be eventually consistent after the client re-registers or the periodic cleanup.
+insert_hist_d(ClientId) ->
+    %% delete old hist records first
+    case delete_hist_d(ClientId) of
+        true ->
+            ok;
+        false ->
+            mria:dirty_write(?CHAN_REG_TAB, hist(ClientId))
+    end.
+
+%% Current timestamp in seconds.
+now_ts() ->
+    erlang:system_time(seconds).
+
+%% Delete all history records for a clientid, return true if there is a Pid found.
+delete_hist_t(ClientId) ->
+    fold_hist(
+        fun(Hist) -> mnesia:delete_object(?CHAN_REG_TAB, Hist, write) end,
+        mnesia:read(?CHAN_REG_TAB, ClientId, write)
+    ).
+
+%% Delete all history records for a clientid, return true if there is a Pid found.
+delete_hist_d(ClientId) ->
+    fold_hist(
+        fun(Hist) -> mria:dirty_delete_object(?CHAN_REG_TAB, Hist) end,
+        mnesia:dirty_read(?CHAN_REG_TAB, ClientId)
+    ).
+
+%% Fold over the history records, return true if there is a Pid found.
+fold_hist(F, List) ->
+    lists:foldl(
+        fun(#channel{pid = Ts} = Record, HasPid) ->
+            case is_integer(Ts) of
+                true ->
+                    ok = F(Record),
+                    HasPid;
+                false ->
+                    true
+            end
+        end,
+        false,
+        List
+    ).
+
+%% Return the session registration history retain duration.
+-spec retain_duration() -> non_neg_integer().
+retain_duration() ->
+    emqx:get_config([broker, session_registration_history_retain]).

+ 139 - 0
apps/emqx/src/emqx_cm_registry_cleaner.erl

@@ -0,0 +1,139 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+%% @doc This module implements the global session registry history cleaner.
+-module(emqx_cm_registry_cleaner).
+-behaviour(gen_server).
+
+-export([start_link/0]).
+
+%% gen_server callbacks
+-export([
+    init/1,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    terminate/2,
+    code_change/3
+]).
+
+-include("emqx_cm.hrl").
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+init(_) ->
+    case mria_config:whoami() =:= core of
+        true ->
+            ok = send_delay_start(),
+            {ok, #{next_clientid => undefined}};
+        false ->
+            ignore
+    end.
+
+handle_call(_Request, _From, State) ->
+    {reply, ok, State}.
+
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+handle_info(start, #{next_clientid := NextClientId} = State) ->
+    case is_hist_enabled() of
+        true ->
+            NewNext =
+                case cleanup_one_chunk(NextClientId) of
+                    '$end_of_table' ->
+                        ok = send_delay_start(),
+                        undefined;
+                    Id ->
+                        _ = erlang:garbage_collect(),
+                        Id
+                end,
+            {noreply, State#{next_clientid := NewNext}};
+        false ->
+            %% if not enabled, dealy and check again
+            %% because it might be enabled from online config change while waiting
+            ok = send_delay_start(),
+            {noreply, State}
+    end;
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+terminate(_Reason, _State) ->
+    ok.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+cleanup_one_chunk(NextClientId) ->
+    Retain = retain_duration(),
+    Now = now_ts(),
+    IsExpired = fun(#channel{pid = Ts}) ->
+        is_integer(Ts) andalso (Ts < Now - Retain)
+    end,
+    cleanup_loop(NextClientId, 10000, IsExpired).
+
+cleanup_loop(ClientId, 0, _IsExpired) ->
+    ClientId;
+cleanup_loop('$end_of_table', _Count, _IsExpired) ->
+    '$end_of_table';
+cleanup_loop(undefined, Count, IsExpired) ->
+    cleanup_loop(mnesia:dirty_first(?CHAN_REG_TAB), Count, IsExpired);
+cleanup_loop(ClientId, Count, IsExpired) ->
+    Recods = mnesia:dirty_read(?CHAN_REG_TAB, ClientId),
+    Next = mnesia:dirty_next(?CHAN_REG_TAB, ClientId),
+    lists:foreach(
+        fun(R) ->
+            case IsExpired(R) of
+                true ->
+                    mria:dirty_delete_object(?CHAN_REG_TAB, R);
+                false ->
+                    ok
+            end
+        end,
+        Recods
+    ),
+    cleanup_loop(Next, Count - 1, IsExpired).
+
+is_hist_enabled() ->
+    retain_duration() > 0.
+
+%% Return the session registration history retain duration in seconds.
+-spec retain_duration() -> non_neg_integer().
+retain_duration() ->
+    emqx:get_config([broker, session_registration_history_retain]).
+
+cleanup_delay() ->
+    Default = timer:minutes(2),
+    case retain_duration() of
+        0 ->
+            %% prepare for online config change
+            Default;
+        RetainSeconds ->
+            Min = max(1, timer:seconds(RetainSeconds div 4)),
+            min(Min, Default)
+    end.
+
+send_delay_start() ->
+    Delay = cleanup_delay(),
+    ok = send_delay_start(Delay).
+
+send_delay_start(Delay) ->
+    _ = erlang:send_after(Delay, self(), start),
+    ok.
+
+now_ts() ->
+    erlang:system_time(seconds).

+ 2 - 0
apps/emqx/src/emqx_cm_sup.erl

@@ -49,6 +49,7 @@ init([]) ->
     Locker = child_spec(emqx_cm_locker, 5000, worker),
     CmPool = emqx_pool_sup:spec(emqx_cm_pool_sup, [?CM_POOL, random, {emqx_pool, start_link, []}]),
     Registry = child_spec(emqx_cm_registry, 5000, worker),
+    RegistryCleaner = child_spec(emqx_cm_registry_cleaner, 5000, worker),
     Manager = child_spec(emqx_cm, 5000, worker),
     DSSessionGCSup = child_spec(emqx_persistent_session_ds_sup, infinity, supervisor),
     Children =
@@ -58,6 +59,7 @@ init([]) ->
             Locker,
             CmPool,
             Registry,
+            RegistryCleaner,
             Manager,
             DSSessionGCSup
         ],

+ 45 - 29
apps/emqx/src/emqx_schema.erl

@@ -182,7 +182,7 @@
 -define(DEFAULT_MULTIPLIER, 1.5).
 -define(DEFAULT_BACKOFF, 0.75).
 
-namespace() -> broker.
+namespace() -> emqx.
 
 tags() ->
     [<<"EMQX">>].
@@ -230,7 +230,7 @@ roots(high) ->
         );
 roots(medium) ->
     [
-        {"broker",
+        {broker,
             sc(
                 ref("broker"),
                 #{
@@ -1347,24 +1347,43 @@ fields("deflate_opts") ->
     ];
 fields("broker") ->
     [
-        {"enable_session_registry",
+        {enable_session_registry,
             sc(
                 boolean(),
                 #{
                     default => true,
+                    importance => ?IMPORTANCE_HIGH,
                     desc => ?DESC(broker_enable_session_registry)
                 }
             )},
-        {"session_locking_strategy",
+        {session_registration_history_retain,
+            sc(
+                duration_s(),
+                #{
+                    default => <<"0s">>,
+                    importance => ?IMPORTANCE_LOW,
+                    desc => ?DESC("broker_session_registration_history_retain")
+                }
+            )},
+        {session_locking_strategy,
             sc(
                 hoconsc:enum([local, leader, quorum, all]),
                 #{
                     default => quorum,
+                    importance => ?IMPORTANCE_HIDDEN,
                     desc => ?DESC(broker_session_locking_strategy)
                 }
             )},
-        shared_subscription_strategy(),
-        {"shared_dispatch_ack_enabled",
+        %% moved to under mqtt root
+        {shared_subscription_strategy,
+            sc(
+                string(),
+                #{
+                    deprecated => {since, "5.1.0"},
+                    importance => ?IMPORTANCE_HIDDEN
+                }
+            )},
+        {shared_dispatch_ack_enabled,
             sc(
                 boolean(),
                 #{
@@ -1374,7 +1393,7 @@ fields("broker") ->
                     desc => ?DESC(broker_shared_dispatch_ack_enabled)
                 }
             )},
-        {"route_batch_clean",
+        {route_batch_clean,
             sc(
                 boolean(),
                 #{
@@ -1383,18 +1402,18 @@ fields("broker") ->
                     importance => ?IMPORTANCE_HIDDEN
                 }
             )},
-        {"perf",
+        {perf,
             sc(
                 ref("broker_perf"),
                 #{importance => ?IMPORTANCE_HIDDEN}
             )},
-        {"routing",
+        {routing,
             sc(
                 ref("broker_routing"),
                 #{importance => ?IMPORTANCE_HIDDEN}
             )},
         %% FIXME: Need new design for shared subscription group
-        {"shared_subscription_group",
+        {shared_subscription_group,
             sc(
                 map(name, ref("shared_subscription_group")),
                 #{
@@ -3640,7 +3659,22 @@ mqtt_general() ->
                     desc => ?DESC(mqtt_shared_subscription)
                 }
             )},
-        shared_subscription_strategy(),
+        {"shared_subscription_strategy",
+            sc(
+                hoconsc:enum([
+                    random,
+                    round_robin,
+                    round_robin_per_group,
+                    sticky,
+                    local,
+                    hash_topic,
+                    hash_clientid
+                ]),
+                #{
+                    default => round_robin,
+                    desc => ?DESC(mqtt_shared_subscription_strategy)
+                }
+            )},
         {"exclusive_subscription",
             sc(
                 boolean(),
@@ -3846,24 +3880,6 @@ mqtt_session() ->
             )}
     ].
 
-shared_subscription_strategy() ->
-    {"shared_subscription_strategy",
-        sc(
-            hoconsc:enum([
-                random,
-                round_robin,
-                round_robin_per_group,
-                sticky,
-                local,
-                hash_topic,
-                hash_clientid
-            ]),
-            #{
-                default => round_robin,
-                desc => ?DESC(broker_shared_subscription_strategy)
-            }
-        )}.
-
 default_mem_check_interval() ->
     case emqx_os_mon:is_os_check_supported() of
         true -> <<"60s">>;

+ 16 - 2
rel/i18n/emqx_schema.hocon

@@ -1022,7 +1022,7 @@ fields_ws_opts_supported_subprotocols.desc:
 fields_ws_opts_supported_subprotocols.label:
 """Supported subprotocols"""
 
-broker_shared_subscription_strategy.desc:
+mqtt_shared_subscription_strategy.desc:
 """Dispatch strategy for shared subscription.
  - `random`: Randomly select a subscriber for dispatch;
  - `round_robin`: Messages from a single publisher are dispatched to subscribers in turn;
@@ -1420,7 +1420,21 @@ force_shutdown_enable.label:
 """Enable `force_shutdown` feature"""
 
 broker_enable_session_registry.desc:
-"""Enable session registry"""
+"""The Global Session Registry is a cluster-wide mechanism designed to maintain the uniqueness of client IDs within the cluster.
+Recommendations for Use<br/>
+- Default Setting: It is generally advisable to enable. This feature is crucial for session takeover to work properly. For example if a client reconneted to another node in the cluster, the new connection will need to find the old session and take it over.
+- Disabling the Feature: Disabling is an option for scenarios when all sessions expire immediately after client is disconnected (i.e. session expiry interval is zero). This can be relevant in certain specialized use cases.
+
+Advantages of Disabling<br/>
+- Reduced Memory Usage: Turning off the session registry can lower the overall memory footprint of the system.
+- Improved Performance: Without the overhead of maintaining a global registry, the node can process client connections faster."""
+
+broker_session_registration_history_retain.desc:
+"""The duration to retain the session registration history. Setting this to a value greater than `0s` will increase memory usage and impact peformance.
+This retained history can be used to monitor how many sessions were registered in the past configured duration.
+Note: This config has no effect if `enable_session_registry` is set to `false`.<br/>
+Note: If the clients are suing random client IDs, it's not recommended to enable this feature, at least not for a long retain duration.<br/>
+Note: When clustered, the lowest (but greater than `0s`) value among the nodes in the cluster will take effect."""
 
 overload_protection_backoff_delay.desc:
 """The maximum duration of delay for background task execution during high load conditions."""