Explorar o código

fix(ds): don't make data dir part of the schema

The data directory was ending up being persisted in the database schema.  This led to
issues when opening the DB on different nodes.
Thales Macedo Garitezi %!s(int64=2) %!d(string=hai) anos
pai
achega
8e31afe6c2

+ 1 - 3
apps/emqx/src/emqx_persistent_message.erl

@@ -61,16 +61,14 @@ force_ds() ->
     emqx_config:get([session_persistence, force_persistence]).
     emqx_config:get([session_persistence, force_persistence]).
 
 
 storage_backend(#{
 storage_backend(#{
-    builtin := Opts = #{
+    builtin := #{
         enable := true,
         enable := true,
         n_shards := NShards,
         n_shards := NShards,
         replication_factor := ReplicationFactor
         replication_factor := ReplicationFactor
     }
     }
 }) ->
 }) ->
-    DataDir = maps:get(data_dir, Opts, emqx:data_dir()),
     #{
     #{
         backend => builtin,
         backend => builtin,
-        data_dir => DataDir,
         storage => {emqx_ds_storage_bitfield_lts, #{}},
         storage => {emqx_ds_storage_bitfield_lts, #{}},
         n_shards => NShards,
         n_shards => NShards,
         replication_factor => ReplicationFactor
         replication_factor => ReplicationFactor

+ 1 - 0
apps/emqx/src/emqx_schema.erl

@@ -1896,6 +1896,7 @@ fields("session_storage_backend_builtin") ->
                 string(),
                 string(),
                 #{
                 #{
                     desc => ?DESC(session_builtin_data_dir),
                     desc => ?DESC(session_builtin_data_dir),
+                    mapping => "emqx_durable_storage.db_data_dir",
                     required => false,
                     required => false,
                     importance => ?IMPORTANCE_LOW
                     importance => ?IMPORTANCE_LOW
                 }
                 }

+ 11 - 14
apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl

@@ -36,7 +36,7 @@
 -export([start_link/2, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
 -export([start_link/2, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
 
 
 %% internal exports:
 %% internal exports:
--export([db_dir/2]).
+-export([db_dir/1]).
 
 
 -export_type([
 -export_type([
     gen_id/0,
     gen_id/0,
@@ -52,6 +52,7 @@
 
 
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 
+-define(APP, emqx_durable_storage).
 -define(REF(ShardId), {via, gproc, {n, l, {?MODULE, ShardId}}}).
 -define(REF(ShardId), {via, gproc, {n, l, {?MODULE, ShardId}}}).
 
 
 %%================================================================================
 %%================================================================================
@@ -199,13 +200,7 @@ open_shard(Shard, Options) ->
 
 
 -spec drop_shard(shard_id()) -> ok.
 -spec drop_shard(shard_id()) -> ok.
 drop_shard(Shard) ->
 drop_shard(Shard) ->
-    case persistent_term:get({?MODULE, Shard, data_dir}, undefined) of
-        undefined ->
-            ok;
-        BaseDir ->
-            ok = rocksdb:destroy(db_dir(BaseDir, Shard), []),
-            persistent_term:erase({?MODULE, Shard, base_dir})
-    end.
+    ok = rocksdb:destroy(db_dir(Shard), []).
 
 
 -spec store_batch(shard_id(), [emqx_types:message()], emqx_ds:message_store_opts()) ->
 -spec store_batch(shard_id(), [emqx_types:message()], emqx_ds:message_store_opts()) ->
     emqx_ds:store_batch_result().
     emqx_ds:store_batch_result().
@@ -589,8 +584,7 @@ rocksdb_open(Shard, Options) ->
         {enable_write_thread_adaptive_yield, false}
         {enable_write_thread_adaptive_yield, false}
         | maps:get(db_options, Options, [])
         | maps:get(db_options, Options, [])
     ],
     ],
-    DataDir = maps:get(data_dir, Options, emqx:data_dir()),
-    DBDir = db_dir(DataDir, Shard),
+    DBDir = db_dir(Shard),
     _ = filelib:ensure_dir(DBDir),
     _ = filelib:ensure_dir(DBDir),
     ExistingCFs =
     ExistingCFs =
         case rocksdb:list_column_families(DBDir, DBOptions) of
         case rocksdb:list_column_families(DBDir, DBOptions) of
@@ -606,16 +600,19 @@ rocksdb_open(Shard, Options) ->
     ],
     ],
     case rocksdb:open(DBDir, DBOptions, ColumnFamilies) of
     case rocksdb:open(DBDir, DBOptions, ColumnFamilies) of
         {ok, DBHandle, [_CFDefault | CFRefs]} ->
         {ok, DBHandle, [_CFDefault | CFRefs]} ->
-            persistent_term:put({?MODULE, Shard, data_dir}, DataDir),
             {CFNames, _} = lists:unzip(ExistingCFs),
             {CFNames, _} = lists:unzip(ExistingCFs),
             {ok, DBHandle, lists:zip(CFNames, CFRefs)};
             {ok, DBHandle, lists:zip(CFNames, CFRefs)};
         Error ->
         Error ->
             Error
             Error
     end.
     end.
 
 
--spec db_dir(file:filename(), shard_id()) -> file:filename().
-db_dir(BaseDir, {DB, ShardId}) ->
-    filename:join([BaseDir, atom_to_list(DB), binary_to_list(ShardId)]).
+-spec db_dir(shard_id()) -> file:filename().
+db_dir({DB, ShardId}) ->
+    filename:join([base_dir(), atom_to_list(DB), binary_to_list(ShardId)]).
+
+-spec base_dir() -> file:filename().
+base_dir() ->
+    application:get_env(?APP, db_data_dir, emqx:data_dir()).
 
 
 -spec update_last_until(Schema, emqx_ds:time()) -> Schema when Schema :: shard_schema() | shard().
 -spec update_last_until(Schema, emqx_ds:time()) -> Schema when Schema :: shard_schema() | shard().
 update_last_until(Schema, Until) ->
 update_last_until(Schema, Until) ->