Explorar o código

feat(sessds): Move config schema to a separate root

ieQu1 %!s(int64=2) %!d(string=hai) anos
pai
achega
24337ecec7

+ 9 - 0
apps/emqx/src/emqx_config.erl

@@ -497,6 +497,15 @@ fill_defaults(RawConf, Opts) ->
     ).
 
 -spec fill_defaults(module(), raw_config(), hocon_tconf:opts()) -> map().
+fill_defaults(_SchemaMod, RawConf = #{<<"durable_storage">> := _}, _) ->
+    %% FIXME: kludge to prevent `emqx_config' module from filling in
+    %% the default values for backends and layouts. These records are
+    %% inside unions, and adding default values there will add
+    %% incompatible fields.
+    %%
+    %% Note: this function is called for each individual conf root, so
+    %% this clause only affects this particular subtree.
+    RawConf;
 fill_defaults(SchemaMod, RawConf, Opts0) ->
     Opts = maps:merge(#{required => false, make_serializable => true}, Opts0),
     hocon_tconf:check_plain(

+ 245 - 0
apps/emqx/src/emqx_ds_schema.erl

@@ -0,0 +1,245 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+%% @doc Schema for EMQX_DS databases.
+-module(emqx_ds_schema).
+
+%% API:
+-export([schema/0, translate_builtin/1]).
+
+%% Behavior callbacks:
+-export([fields/1, desc/1, namespace/0]).
+
+-include("emqx_schema.hrl").
+-include_lib("typerefl/include/types.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
+-include_lib("hocon/include/hocon_types.hrl").
+
+%%================================================================================
+%% Type declarations
+%%================================================================================
+
+%%================================================================================
+%% API
+%%================================================================================
+
+translate_builtin(#{
+    backend := builtin,
+    n_shards := NShards,
+    replication_factor := ReplFactor,
+    layout := Layout
+}) ->
+    Storage =
+        case Layout of
+            #{
+                type := wildcard_optimized,
+                bits_per_topic_level := BitsPerTopicLevel,
+                epoch_bits := EpochBits,
+                topic_index_bytes := TIBytes
+            } ->
+                {emqx_ds_storage_bitfield_lts, #{
+                    bits_per_topic_level => BitsPerTopicLevel,
+                    topic_index_bytes => TIBytes,
+                    epoch_bits => EpochBits
+                }};
+            #{type := reference} ->
+                {emqx_ds_storage_reference, #{}}
+        end,
+    #{
+        backend => builtin,
+        n_shards => NShards,
+        replication_factor => ReplFactor,
+        storage => Storage
+    }.
+
+%%================================================================================
+%% Behavior callbacks
+%%================================================================================
+
+namespace() ->
+    durable_storage.
+
+schema() ->
+    [
+        {"messages",
+            ds_schema(#{
+                desc => ?DESC(messages),
+                importance => ?IMPORTANCE_HIDDEN,
+                default =>
+                    #{
+                        <<"backend">> => builtin
+                    }
+            })}
+    ].
+
+fields("builtin") ->
+    %% Schema for the builtin backend:
+    [
+        {"backend",
+            sc(
+                builtin,
+                #{
+                    importance => ?IMPORTANCE_MEDIUM,
+                    'readOnly' => true,
+                    default => builtin,
+                    desc => ?DESC(builtin)
+                }
+            )},
+        {"_config_handler",
+            sc(
+                {module(), atom()},
+                #{
+                    importance => ?IMPORTANCE_HIDDEN,
+                    'readOnly' => true,
+                    default => {?MODULE, translate_builtin}
+                }
+            )},
+        {"data_dir",
+            sc(
+                string(),
+                #{
+                    desc => ?DESC(builtin_data_dir),
+                    mapping => "emqx_durable_storage.db_data_dir",
+                    required => false,
+                    importance => ?IMPORTANCE_MEDIUM
+                }
+            )},
+        {"n_shards",
+            sc(
+                pos_integer(),
+                #{
+                    importance => ?IMPORTANCE_MEDIUM,
+                    desc => ?DESC(builtin_n_shards),
+                    default => 16
+                }
+            )},
+        {"replication_factor",
+            sc(
+                pos_integer(),
+                #{
+                    default => 3,
+                    importance => ?IMPORTANCE_HIDDEN
+                }
+            )},
+        {"egress",
+            sc(
+                ref("builtin_egress"),
+                #{
+                    desc => ?DESC(builtin_egress),
+                    importance => ?IMPORTANCE_MEDIUM
+                }
+            )},
+        {"layout",
+            sc(
+                hoconsc:union([
+                    ref("layout_builtin_wildcard_optimized"), ref("layout_builtin_reference")
+                ]),
+                #{
+                    desc => ?DESC(builtin_layout),
+                    importance => ?IMPORTANCE_HIDDEN,
+                    default =>
+                        #{
+                            <<"type">> => wildcard_optimized
+                        }
+                }
+            )}
+    ];
+fields("builtin_egress") ->
+    [
+        {"max_items",
+            sc(
+                pos_integer(),
+                #{
+                    default => 1000,
+                    mapping => "emqx_durable_storage.egress_batch_size",
+                    importance => ?IMPORTANCE_HIDDEN
+                }
+            )},
+        {"flush_interval",
+            sc(
+                emqx_schema:timeout_duration_ms(),
+                #{
+                    default => 100,
+                    mapping => "emqx_durable_storage.egress_flush_interval",
+                    importance => ?IMPORTANCE_HIDDEN
+                }
+            )}
+    ];
+fields("layout_builtin_wildcard_optimized") ->
+    [
+        {"type",
+            sc(
+                wildcard_optimized,
+                #{
+                    desc => ?DESC(layout_wildcard_optimized),
+                    'readOnly' => true,
+                    default => wildcard_optimized
+                }
+            )},
+        {"bits_per_topic_level",
+            sc(
+                range(1, 64),
+                #{
+                    default => 64,
+                    importance => ?IMPORTANCE_HIDDEN
+                }
+            )},
+        {"epoch_bits",
+            sc(
+                range(0, 64),
+                #{
+                    default => 10,
+                    importance => ?IMPORTANCE_MEDIUM,
+                    desc => ?DESC(wildcard_optimized_epoch_bits)
+                }
+            )},
+        {"topic_index_bytes",
+            sc(
+                pos_integer(),
+                #{
+                    default => 4,
+                    importance => ?IMPORTANCE_HIDDEN
+                }
+            )}
+    ];
+fields("layout_builtin_reference") ->
+    [
+        {"type",
+            sc(
+                reference,
+                #{'readOnly' => true}
+            )}
+    ].
+
+desc(_) ->
+    undefined.
+
+%%================================================================================
+%% Internal functions
+%%================================================================================
+
+ds_schema(Options) ->
+    sc(
+        hoconsc:union([
+            ref("builtin")
+            | emqx_schema_hooks:injection_point('durable_storage.backends', [])
+        ]),
+        Options
+    ).
+
+sc(Type, Meta) -> hoconsc:mk(Type, Meta).
+
+ref(StructName) -> hoconsc:ref(?MODULE, StructName).

+ 4 - 18
apps/emqx/src/emqx_persistent_message.erl

@@ -52,7 +52,7 @@ is_persistence_enabled() ->
 
 -spec storage_backend() -> emqx_ds:create_db_opts().
 storage_backend() ->
-    storage_backend(emqx_config:get([session_persistence, storage])).
+    storage_backend([durable_storage, messages]).
 
 %% Dev-only option: force all messages to go through
 %% `emqx_persistent_session_ds':
@@ -60,23 +60,9 @@ storage_backend() ->
 force_ds() ->
     emqx_config:get([session_persistence, force_persistence]).
 
-storage_backend(#{
-    builtin := #{
-        enable := true,
-        n_shards := NShards,
-        replication_factor := ReplicationFactor
-    }
-}) ->
-    #{
-        backend => builtin,
-        storage => {emqx_ds_storage_bitfield_lts, #{}},
-        n_shards => NShards,
-        replication_factor => ReplicationFactor
-    };
-storage_backend(#{
-    fdb := #{enable := true} = FDBConfig
-}) ->
-    FDBConfig#{backend => fdb}.
+storage_backend(Path) ->
+    ConfigTree = #{'_config_handler' := {Module, Function}} = emqx_config:get(Path),
+    apply(Module, Function, [ConfigTree]).
 
 %%--------------------------------------------------------------------
 

+ 7 - 84
apps/emqx/src/emqx_schema.erl

@@ -254,6 +254,11 @@ roots(medium) ->
             sc(
                 ref("overload_protection"),
                 #{importance => ?IMPORTANCE_HIDDEN}
+            )},
+        {"durable_storage",
+            sc(
+                ref("durable_storage"),
+                #{importance => ?IMPORTANCE_HIDDEN}
             )}
     ];
 roots(low) ->
@@ -1654,16 +1659,6 @@ fields("session_persistence") ->
                     default => false
                 }
             )},
-        {"storage",
-            sc(
-                ref("session_storage_backend"), #{
-                    desc => ?DESC(session_persistence_storage),
-                    validator => fun validate_backend_enabled/1,
-                    default => #{
-                        <<"builtin">> => #{}
-                    }
-                }
-            )},
         {"max_batch_size",
             sc(
                 pos_integer(),
@@ -1739,69 +1734,8 @@ fields("session_persistence") ->
                 }
             )}
     ];
-fields("session_storage_backend") ->
-    [
-        {"builtin",
-            sc(ref("session_storage_backend_builtin"), #{
-                desc => ?DESC(session_storage_backend_builtin),
-                required => {false, recursively}
-            })}
-    ] ++ emqx_schema_hooks:injection_point('session_persistence.storage_backends', []);
-fields("session_storage_backend_builtin") ->
-    [
-        {"enable",
-            sc(
-                boolean(),
-                #{
-                    desc => ?DESC(session_storage_backend_enable),
-                    default => true
-                }
-            )},
-        {"data_dir",
-            sc(
-                string(),
-                #{
-                    desc => ?DESC(session_builtin_data_dir),
-                    mapping => "emqx_durable_storage.db_data_dir",
-                    required => false,
-                    importance => ?IMPORTANCE_LOW
-                }
-            )},
-        {"n_shards",
-            sc(
-                pos_integer(),
-                #{
-                    desc => ?DESC(session_builtin_n_shards),
-                    default => 16
-                }
-            )},
-        {"replication_factor",
-            sc(
-                pos_integer(),
-                #{
-                    default => 3,
-                    importance => ?IMPORTANCE_HIDDEN
-                }
-            )},
-        {"egress_batch_size",
-            sc(
-                pos_integer(),
-                #{
-                    default => 1000,
-                    mapping => "emqx_durable_storage.egress_batch_size",
-                    importance => ?IMPORTANCE_HIDDEN
-                }
-            )},
-        {"egress_flush_interval",
-            sc(
-                timeout_duration_ms(),
-                #{
-                    default => 100,
-                    mapping => "emqx_durable_storage.egress_flush_interval",
-                    importance => ?IMPORTANCE_HIDDEN
-                }
-            )}
-    ].
+fields("durable_storage") ->
+    emqx_ds_schema:schema().
 
 mqtt_listener(Bind) ->
     base_listener(Bind) ++
@@ -2077,17 +2011,6 @@ ensure_list(V) ->
 filter(Opts) ->
     [{K, V} || {K, V} <- Opts, V =/= undefined].
 
-validate_backend_enabled(Config) ->
-    Enabled = maps:filter(fun(_, #{<<"enable">> := E}) -> E end, Config),
-    case maps:to_list(Enabled) of
-        [{_Type, _BackendConfig}] ->
-            ok;
-        _Conflicts = [_ | _] ->
-            {error, multiple_enabled_backends};
-        _None = [] ->
-            {error, no_enabled_backend}
-    end.
-
 %% @private This function defines the SSL opts which are commonly used by
 %% SSL listener and client.
 -spec common_ssl_opts_schema(map(), server | client) -> hocon_schema:field_schema().

+ 1 - 1
apps/emqx/test/emqx_persistent_messages_SUITE.erl

@@ -50,7 +50,7 @@ init_per_testcase(t_message_gc = TestCase, Config) ->
     Opts = #{
         extra_emqx_conf =>
             "\n  session_persistence.message_retention_period = 1s"
-            "\n  session_persistence.storage.builtin.n_shards = 3"
+            "\n  durable_storage.messages.n_shards = 3"
     },
     common_init_per_testcase(TestCase, [{n_shards, 3} | Config], Opts);
 init_per_testcase(TestCase, Config) ->

+ 35 - 0
rel/i18n/emqx_ds_schema.hocon

@@ -0,0 +1,35 @@
+emqx_ds_schema {
+
+messages.desc:
+"""Configuration related to the durable storage of MQTT messages."""
+
+builtin.desc:
+"""Builtin session storage backend utilizing embedded RocksDB key-value store."""
+
+builtin_data_dir.desc:
+"""File system directory where the database is located."""
+
+builtin_n_shards.desc:
+"""The builtin durable storage partitions data into shards.
+This configuration parameter defines the number of shards.
+Please note that it takes effect only during the initialization of the durable storage database.
+Changing this configuration parameter after the database has been already created won't take any effect."""
+
+builtin_egress.desc:
+"""Configuration related to the buffering of messages from the local node to the shard leader."""
+
+builtin_layout.desc:
+"""Storage layout is a method of arranging messages from various topics and clients on disc.
+
+Depending on the type of workload and the topic structure, different types of strategies for storing the data can be employed to maximize efficency of the replay."""
+
+
+layout_wildcard_optimized.desc:
+"""_Wildcard-optimized_ layout is designed to maximize the throughput of the wildcard subscriptions covering large numbers of topics."""
+
+wildcard_optimized_epoch_bits.desc:
+"""Wildcard-optimized layout partitions messages recorded at different times into "epochs".
+Each epoch can be consumed by the subscribers as a batch.
+Generally, larger epochs lead to higher throughput of subscribers, however currently they may increase latency."""
+
+}