Sfoglia il codice sorgente

Merge pull request #12340 from thalesmg/ds-db-data-path-m-20240116

feat(ds): allow customizing the data directory
Thales Macedo Garitezi 2 anni fa
parent
commit
39090d3732

+ 7 - 1
apps/emqx/src/emqx_persistent_message.erl

@@ -61,10 +61,16 @@ force_ds() ->
     emqx_config:get([session_persistence, force_persistence]).
     emqx_config:get([session_persistence, force_persistence]).
 
 
 storage_backend(#{
 storage_backend(#{
-    builtin := #{enable := true, n_shards := NShards, replication_factor := ReplicationFactor}
+    builtin := #{
+        enable := true,
+        data_dir := DataDir,
+        n_shards := NShards,
+        replication_factor := ReplicationFactor
+    }
 }) ->
 }) ->
     #{
     #{
         backend => builtin,
         backend => builtin,
+        data_dir => DataDir,
         storage => {emqx_ds_storage_bitfield_lts, #{}},
         storage => {emqx_ds_storage_bitfield_lts, #{}},
         n_shards => NShards,
         n_shards => NShards,
         replication_factor => ReplicationFactor
         replication_factor => ReplicationFactor

+ 30 - 0
apps/emqx/src/emqx_schema.erl

@@ -94,6 +94,7 @@
     non_empty_string/1,
     non_empty_string/1,
     validations/0,
     validations/0,
     naive_env_interpolation/1,
     naive_env_interpolation/1,
+    ensure_unicode_path/2,
     validate_server_ssl_opts/1,
     validate_server_ssl_opts/1,
     validate_tcp_keepalive/1,
     validate_tcp_keepalive/1,
     parse_tcp_keepalive/1
     parse_tcp_keepalive/1
@@ -1882,6 +1883,18 @@ fields("session_storage_backend_builtin") ->
                     default => true
                     default => true
                 }
                 }
             )},
             )},
+        {"data_dir",
+            sc(
+                string(),
+                #{
+                    desc => ?DESC(session_builtin_data_dir),
+                    default => <<"${EMQX_DATA_DIR}">>,
+                    importance => ?IMPORTANCE_LOW,
+                    converter => fun(Path, Opts) ->
+                        naive_env_interpolation(ensure_unicode_path(Path, Opts))
+                    end
+                }
+            )},
         {"n_shards",
         {"n_shards",
             sc(
             sc(
                 pos_integer(),
                 pos_integer(),
@@ -3836,3 +3849,20 @@ tags_schema() ->
             importance => ?IMPORTANCE_LOW
             importance => ?IMPORTANCE_LOW
         }
         }
     ).
     ).
+
+ensure_unicode_path(undefined, _) ->
+    undefined;
+ensure_unicode_path(Path, #{make_serializable := true}) ->
+    %% format back to serializable string
+    unicode:characters_to_binary(Path, utf8);
+ensure_unicode_path(Path, Opts) when is_binary(Path) ->
+    case unicode:characters_to_list(Path, utf8) of
+        {R, _, _} when R =:= error orelse R =:= incomplete ->
+            throw({"bad_file_path_string", Path});
+        PathStr ->
+            ensure_unicode_path(PathStr, Opts)
+    end;
+ensure_unicode_path(Path, _) when is_list(Path) ->
+    Path;
+ensure_unicode_path(Path, _) ->
+    throw({"not_string", Path}).

+ 2 - 16
apps/emqx_conf/src/emqx_conf_schema.erl

@@ -1430,22 +1430,8 @@ convert_rotation(#{} = Rotation, _Opts) -> maps:get(<<"count">>, Rotation, 10);
 convert_rotation(Count, _Opts) when is_integer(Count) -> Count;
 convert_rotation(Count, _Opts) when is_integer(Count) -> Count;
 convert_rotation(Count, _Opts) -> throw({"bad_rotation", Count}).
 convert_rotation(Count, _Opts) -> throw({"bad_rotation", Count}).
 
 
-ensure_unicode_path(undefined, _) ->
-    undefined;
-ensure_unicode_path(Path, #{make_serializable := true}) ->
-    %% format back to serializable string
-    unicode:characters_to_binary(Path, utf8);
-ensure_unicode_path(Path, Opts) when is_binary(Path) ->
-    case unicode:characters_to_list(Path, utf8) of
-        {R, _, _} when R =:= error orelse R =:= incomplete ->
-            throw({"bad_file_path_string", Path});
-        PathStr ->
-            ensure_unicode_path(PathStr, Opts)
-    end;
-ensure_unicode_path(Path, _) when is_list(Path) ->
-    Path;
-ensure_unicode_path(Path, _) ->
-    throw({"not_string", Path}).
+ensure_unicode_path(Path, Opts) ->
+    emqx_schema:ensure_unicode_path(Path, Opts).
 
 
 log_level() ->
 log_level() ->
     hoconsc:enum([debug, info, notice, warning, error, critical, alert, emergency, all]).
     hoconsc:enum([debug, info, notice, warning, error, critical, alert, emergency, all]).

+ 14 - 6
apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl

@@ -34,7 +34,7 @@
 -export([start_link/2, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
 -export([start_link/2, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
 
 
 %% internal exports:
 %% internal exports:
--export([db_dir/1]).
+-export([db_dir/2]).
 
 
 -export_type([
 -export_type([
     gen_id/0,
     gen_id/0,
@@ -168,7 +168,13 @@ open_shard(Shard, Options) ->
 -spec drop_shard(shard_id()) -> ok.
 -spec drop_shard(shard_id()) -> ok.
 drop_shard(Shard) ->
 drop_shard(Shard) ->
     catch emqx_ds_storage_layer_sup:stop_shard(Shard),
     catch emqx_ds_storage_layer_sup:stop_shard(Shard),
-    ok = rocksdb:destroy(db_dir(Shard), []).
+    case persistent_term:get({?MODULE, Shard, data_dir}, undefined) of
+        undefined ->
+            ok;
+        BaseDir ->
+            ok = rocksdb:destroy(db_dir(BaseDir, Shard), []),
+            persistent_term:erase({?MODULE, Shard, base_dir})
+    end.
 
 
 -spec store_batch(shard_id(), [emqx_types:message()], emqx_ds:message_store_opts()) ->
 -spec store_batch(shard_id(), [emqx_types:message()], emqx_ds:message_store_opts()) ->
     emqx_ds:store_batch_result().
     emqx_ds:store_batch_result().
@@ -424,7 +430,8 @@ rocksdb_open(Shard, Options) ->
         {create_missing_column_families, true}
         {create_missing_column_families, true}
         | maps:get(db_options, Options, [])
         | maps:get(db_options, Options, [])
     ],
     ],
-    DBDir = db_dir(Shard),
+    DataDir = maps:get(data_dir, Options, emqx:data_dir()),
+    DBDir = db_dir(DataDir, Shard),
     _ = filelib:ensure_dir(DBDir),
     _ = filelib:ensure_dir(DBDir),
     ExistingCFs =
     ExistingCFs =
         case rocksdb:list_column_families(DBDir, DBOptions) of
         case rocksdb:list_column_families(DBDir, DBOptions) of
@@ -440,15 +447,16 @@ rocksdb_open(Shard, Options) ->
     ],
     ],
     case rocksdb:open(DBDir, DBOptions, ColumnFamilies) of
     case rocksdb:open(DBDir, DBOptions, ColumnFamilies) of
         {ok, DBHandle, [_CFDefault | CFRefs]} ->
         {ok, DBHandle, [_CFDefault | CFRefs]} ->
+            persistent_term:put({?MODULE, Shard, data_dir}, DataDir),
             {CFNames, _} = lists:unzip(ExistingCFs),
             {CFNames, _} = lists:unzip(ExistingCFs),
             {ok, DBHandle, lists:zip(CFNames, CFRefs)};
             {ok, DBHandle, lists:zip(CFNames, CFRefs)};
         Error ->
         Error ->
             Error
             Error
     end.
     end.
 
 
--spec db_dir(shard_id()) -> file:filename().
-db_dir({DB, ShardId}) ->
-    filename:join([emqx:data_dir(), atom_to_list(DB), binary_to_list(ShardId)]).
+-spec db_dir(file:filename(), shard_id()) -> file:filename().
+db_dir(BaseDir, {DB, ShardId}) ->
+    filename:join([BaseDir, atom_to_list(DB), binary_to_list(ShardId)]).
 
 
 -spec update_last_until(Schema, emqx_ds:time()) -> Schema when Schema :: shard_schema() | shard().
 -spec update_last_until(Schema, emqx_ds:time()) -> Schema when Schema :: shard_schema() | shard().
 update_last_until(Schema, Until) ->
 update_last_until(Schema, Until) ->