Przeglądaj źródła

Merge pull request #12739 from ieQu1/dev/zone-durability

feat(sessds): Add zone overrides for session durability settings
ieQu1 1 rok temu
rodzic
commit
8b963d5960

+ 18 - 5
apps/emqx/src/emqx_persistent_message.erl

@@ -19,9 +19,10 @@
 -behaviour(emqx_config_handler).
 
 -include("emqx.hrl").
+-include_lib("emqx/include/logger.hrl").
 
 -export([init/0]).
--export([is_persistence_enabled/0, force_ds/0]).
+-export([is_persistence_enabled/0, is_persistence_enabled/1, force_ds/1]).
 
 %% Config handler
 -export([add_handler/0, pre_config_update/3]).
@@ -32,6 +33,7 @@
 ]).
 
 -define(PERSISTENT_MESSAGE_DB, emqx_persistent_message).
+-define(PERSISTENCE_ENABLED, emqx_message_persistence_enabled).
 
 -define(WHEN_ENABLED(DO),
     case is_persistence_enabled() of
@@ -43,7 +45,14 @@
 %%--------------------------------------------------------------------
 
 init() ->
+    %% Note: currently persistence can't be enabled or disabled in the
+    %% runtime. If persistence is enabled for any of the zones, we
+    %% consider durability feature to be on:
+    Zones = maps:keys(emqx_config:get([zones])),
+    IsEnabled = lists:any(fun is_persistence_enabled/1, Zones),
+    persistent_term:put(?PERSISTENCE_ENABLED, IsEnabled),
     ?WHEN_ENABLED(begin
+        ?SLOG(notice, #{msg => "Session durability is enabled"}),
         Backend = storage_backend(),
         ok = emqx_ds:open_db(?PERSISTENT_MESSAGE_DB, Backend),
         ok = emqx_persistent_session_ds_router:init_tables(),
@@ -53,7 +62,11 @@ init() ->
 
 -spec is_persistence_enabled() -> boolean().
 is_persistence_enabled() ->
-    emqx_config:get([session_persistence, enable]).
+    persistent_term:get(?PERSISTENCE_ENABLED).
+
+-spec is_persistence_enabled(emqx_types:zone()) -> boolean().
+is_persistence_enabled(Zone) ->
+    emqx_config:get_zone_conf(Zone, [session_persistence, enable]).
 
 -spec storage_backend() -> emqx_ds:create_db_opts().
 storage_backend() ->
@@ -61,9 +74,9 @@ storage_backend() ->
 
 %% Dev-only option: force all messages to go through
 %% `emqx_persistent_session_ds':
--spec force_ds() -> boolean().
-force_ds() ->
-    emqx_config:get([session_persistence, force_persistence]).
+-spec force_ds(emqx_types:zone()) -> boolean().
+force_ds(Zone) ->
+    emqx_config:get_zone_conf(Zone, [session_persistence, force_persistence]).
 
 storage_backend(Path) ->
     ConfigTree = #{'_config_handler' := {Module, Function}} = emqx_config:get(Path),

+ 10 - 4
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -502,7 +502,7 @@ handle_timeout(ClientInfo, ?TIMER_PULL, Session0) ->
     Timeout =
         case Publishes of
             [] ->
-                emqx_config:get([session_persistence, idle_poll_interval]);
+                get_config(ClientInfo, [idle_poll_interval]);
             [_ | _] ->
                 0
         end,
@@ -511,10 +511,10 @@ handle_timeout(ClientInfo, ?TIMER_PULL, Session0) ->
 handle_timeout(ClientInfo, ?TIMER_RETRY_REPLAY, Session0) ->
     Session = replay_streams(Session0, ClientInfo),
     {ok, [], Session};
-handle_timeout(_ClientInfo, ?TIMER_GET_STREAMS, Session0 = #{s := S0}) ->
+handle_timeout(ClientInfo, ?TIMER_GET_STREAMS, Session0 = #{s := S0}) ->
     S1 = emqx_persistent_session_ds_subs:gc(S0),
     S = emqx_persistent_session_ds_stream_scheduler:renew_streams(S1),
-    Interval = emqx_config:get([session_persistence, renew_streams_interval]),
+    Interval = get_config(ClientInfo, [renew_streams_interval]),
     Session = emqx_session:ensure_timer(
         ?TIMER_GET_STREAMS,
         Interval,
@@ -799,7 +799,7 @@ fetch_new_messages(Session = #{s := S}, ClientInfo) ->
 fetch_new_messages([], Session, _ClientInfo) ->
     Session;
 fetch_new_messages([I | Streams], Session0 = #{inflight := Inflight}, ClientInfo) ->
-    BatchSize = emqx_config:get([session_persistence, batch_size]),
+    BatchSize = get_config(ClientInfo, [batch_size]),
     case emqx_persistent_session_ds_inflight:n_buffered(all, Inflight) >= BatchSize of
         true ->
             %% Buffer is full:
@@ -1055,9 +1055,15 @@ receive_maximum(ConnInfo) ->
 expiry_interval(ConnInfo) ->
     maps:get(expiry_interval, ConnInfo, 0).
 
+%% Note: we don't allow overriding `last_alive_update_interval' per
+%% zone, since the GC process is responsible for all sessions
+%% regardless of the zone.
 bump_interval() ->
     emqx_config:get([session_persistence, last_alive_update_interval]).
 
+get_config(#{zone := Zone}, Key) ->
+    emqx_config:get_zone_conf(Zone, [session_persistence | Key]).
+
 -spec try_get_live_session(emqx_types:clientid()) ->
     {pid(), session()} | not_found | not_persistent.
 try_get_live_session(ClientId) ->

+ 21 - 26
apps/emqx/src/emqx_session.erl

@@ -192,7 +192,7 @@ create(ClientInfo, ConnInfo) ->
 
 create(ClientInfo, ConnInfo, Conf) ->
     % FIXME error conditions
-    create(choose_impl_mod(ConnInfo), ClientInfo, ConnInfo, Conf).
+    create(hd(choose_impl_candidates(ClientInfo, ConnInfo)), ClientInfo, ConnInfo, Conf).
 
 create(Mod, ClientInfo, ConnInfo, Conf) ->
     % FIXME error conditions
@@ -205,7 +205,7 @@ create(Mod, ClientInfo, ConnInfo, Conf) ->
     {_IsPresent :: true, t(), _ReplayContext} | {_IsPresent :: false, t()}.
 open(ClientInfo, ConnInfo) ->
     Conf = get_session_conf(ClientInfo),
-    Mods = [Default | _] = choose_impl_candidates(ConnInfo),
+    Mods = [Default | _] = choose_impl_candidates(ClientInfo, ConnInfo),
     %% NOTE
     %% Try to look the existing session up in session stores corresponding to the given
     %% `Mods` in order, starting from the last one.
@@ -253,7 +253,7 @@ destroy(ClientInfo, ConnInfo) ->
     %% 0, and later reconnects with `Session-Expiry-Interval' = 0 and `clean_start' =
     %% true.  So we may simply destroy sessions from all implementations, since the key
     %% (ClientID) is the same.
-    Mods = choose_impl_candidates(ConnInfo),
+    Mods = choose_impl_candidates(ClientInfo, ConnInfo),
     lists:foreach(fun(Mod) -> Mod:destroy(ClientInfo) end, Mods).
 
 -spec destroy(t()) -> ok.
@@ -610,31 +610,26 @@ maybe_mock_impl_mod(Session) ->
     error(noimpl, [Session]).
 -endif.
 
--spec choose_impl_mod(conninfo()) -> module().
-choose_impl_mod(#{expiry_interval := EI}) ->
-    hd(choose_impl_candidates(EI, emqx_persistent_message:is_persistence_enabled())).
-
--spec choose_impl_candidates(conninfo()) -> [module()].
-choose_impl_candidates(#{expiry_interval := EI}) ->
-    choose_impl_candidates(EI, emqx_persistent_message:is_persistence_enabled()).
-
-choose_impl_candidates(_, _IsPSStoreEnabled = false) ->
-    [emqx_session_mem];
-choose_impl_candidates(0, _IsPSStoreEnabled = true) ->
-    case emqx_persistent_message:force_ds() of
+choose_impl_candidates(#{zone := Zone}, #{expiry_interval := EI}) ->
+    case emqx_persistent_message:is_persistence_enabled(Zone) of
         false ->
-            %% NOTE
-            %% If ExpiryInterval is 0, the natural choice is
-            %% `emqx_session_mem'. Yet we still need to look the
-            %% existing session up in the `emqx_persistent_session_ds'
-            %% store first, because previous connection may have set
-            %% ExpiryInterval to a non-zero value.
-            [emqx_session_mem, emqx_persistent_session_ds];
+            [emqx_session_mem];
         true ->
-            [emqx_persistent_session_ds]
-    end;
-choose_impl_candidates(EI, _IsPSStoreEnabled = true) when EI > 0 ->
-    [emqx_persistent_session_ds].
+            Force = emqx_persistent_message:force_ds(Zone),
+            case EI of
+                0 when not Force ->
+                    %% NOTE
+                    %% If ExpiryInterval is 0, the natural choice is
+                    %% `emqx_session_mem'. Yet we still need to look
+                    %% the existing session up in the
+                    %% `emqx_persistent_session_ds' store first,
+                    %% because previous connection may have set
+                    %% ExpiryInterval to a non-zero value.
+                    [emqx_session_mem, emqx_persistent_session_ds];
+                _ ->
+                    [emqx_persistent_session_ds]
+            end
+    end.
 
 -compile({inline, [run_hook/2]}).
 run_hook(Name, Args) ->

+ 2 - 1
apps/emqx/src/emqx_zone_schema.erl

@@ -33,7 +33,8 @@ roots() ->
         force_shutdown,
         conn_congestion,
         force_gc,
-        overload_protection
+        overload_protection,
+        session_persistence
     ].
 
 zones_without_default() ->

+ 13 - 1
apps/emqx/test/emqx_config_SUITE.erl

@@ -463,5 +463,17 @@ zone_global_defaults() ->
                 backoff_new_conn => true,
                 enable => false
             },
-        stats => #{enable => true}
+        stats => #{enable => true},
+        session_persistence =>
+            #{
+                enable => false,
+                batch_size => 100,
+                force_persistence => false,
+                idle_poll_interval => 100,
+                last_alive_update_interval => 5000,
+                message_retention_period => 86400000,
+                renew_streams_interval => 5000,
+                session_gc_batch_size => 100,
+                session_gc_interval => 600000
+            }
     }.

+ 2 - 0
changes/ce/feat-12739.en.md

@@ -0,0 +1,2 @@
+Make it possible to override `session_persistence` settings per zone.
+Since durable sessions are inherently more expensive to maintain than the regular sessions, it's desirable to grant the operator finer control of session durability for different classes of clients.