Просмотр исходного кода

feat(dssubs): inject DB schema for leader store

Andrew Mayorov 1 год назад
Родитель
Сommit
7c7fd2d735

+ 1 - 1
apps/emqx/src/emqx_ds_schema.erl

@@ -99,7 +99,7 @@ schema() ->
                 importance => ?IMPORTANCE_MEDIUM,
                 desc => ?DESC(messages)
             })}
-    ].
+    ] ++ emqx_schema_hooks:injection_point('durable_storage', []).
 
 storage_schema(ExtraOptions) ->
     Options = #{

+ 6 - 0
apps/emqx_conf/src/emqx_conf_schema_inject.erl

@@ -27,6 +27,7 @@ schemas(Edition) ->
         cluster_linking(Edition) ++
         authn(Edition) ++
         authz() ++
+        shared_subs(Edition) ++
         customized(Edition).
 
 mria(ce) ->
@@ -81,6 +82,11 @@ authz_mods() ->
         emqx_authz_ldap_schema
     ].
 
+shared_subs(ee) ->
+    [emqx_ds_shared_sub_schema];
+shared_subs(ce) ->
+    [].
+
 %% Add more schemas here.
 customized(_Edition) ->
     [].

+ 8 - 2
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader_store.erl

@@ -37,12 +37,18 @@
 %%
 
 open() ->
-    %% FIXME
-    emqx_ds:open_db(?DS_DB, emqx_ds_schema:db_config([durable_storage, messages])).
+    emqx_ds:open_db(?DS_DB, db_config()).
 
 close() ->
     emqx_ds:close_db(?DS_DB).
 
+db_config() ->
+    Config = emqx_ds_schema:db_config([durable_storage, queues]),
+    Config#{
+        atomic_batches => true,
+        force_monotonic_timestamps => false
+    }.
+
 %%
 
 -spec claim_leadership(group(), ID, emqx_message:timestamp()) ->

+ 15 - 0
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_schema.erl

@@ -13,6 +13,10 @@
     desc/1
 ]).
 
+-export([
+    injected_fields/0
+]).
+
 namespace() -> emqx_shared_subs.
 
 roots() ->
@@ -42,6 +46,17 @@ fields(durable_queues) ->
         duration(leader_session_not_replaying_timeout_ms, 5000)
     ].
 
+injected_fields() ->
+    #{
+        'durable_storage' => [
+            {queues,
+                emqx_ds_schema:storage_schema(#{
+                    importance => ?IMPORTANCE_HIDDEN,
+                    desc => ?DESC(durable_queues_storage)
+                })}
+        ]
+    }.
+
 duration(MsFieldName, Default) ->
     {MsFieldName,
         ?HOCON(