Explorar el Código

feat(ds): Add an API for DB-global variables

ieQu1 hace 1 año
padre
commit
8ac9700aab

+ 1 - 0
apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl

@@ -118,6 +118,7 @@ which_dbs() ->
 init({#?db_sup{db = DB}, DefaultOpts}) ->
     %% Spec for the top-level supervisor for the database:
     logger:notice("Starting DS DB ~p", [DB]),
+    emqx_ds_builtin_sup:clean_gvars(DB),
     emqx_ds_builtin_metrics:init_for_db(DB),
     Opts = emqx_ds_replication_layer_meta:open_db(DB, DefaultOpts),
     ok = start_ra_system(DB, Opts),

+ 29 - 1
apps/emqx_durable_storage/src/emqx_ds_builtin_sup.erl

@@ -23,6 +23,7 @@
 
 %% API:
 -export([start_db/2, stop_db/1]).
+-export([set_gvar/3, get_gvar/3, clean_gvars/1]).
 
 %% behavior callbacks:
 -export([init/1]).
@@ -39,6 +40,13 @@
 -define(top, ?MODULE).
 -define(databases, emqx_ds_builtin_databases_sup).
 
+-define(gvar_tab, emqx_ds_builtin_gvar).
+
+-record(gvar, {
+    k :: {emqx_ds:db(), _Key},
+    v :: _Value
+}).
+
 %%================================================================================
 %% API functions
 %%================================================================================
@@ -61,11 +69,30 @@ stop_db(DB) ->
         Pid when is_pid(Pid) ->
             _ = supervisor:terminate_child(?databases, DB),
             _ = supervisor:delete_child(?databases, DB),
-            ok;
+            clean_gvars(DB);
         undefined ->
             ok
     end.
 
+%% @doc Set a DB-global variable. Please don't abuse this API.
+-spec set_gvar(emqx_ds:db(), _Key, _Val) -> ok.
+set_gvar(DB, Key, Val) ->
+    ets:insert(?gvar_tab, #gvar{k = {DB, Key}, v = Val}).
+
+-spec get_gvar(emqx_ds:db(), _Key, Val) -> Val.
+get_gvar(DB, Key, Default) ->
+    case ets:lookup(?gvar_tab, {DB, Key}) of
+        [#gvar{v = Val}] ->
+            Val;
+        [] ->
+            Default
+    end.
+
+-spec clean_gvars(emqx_ds:db()) -> ok.
+clean_gvars(DB) ->
+    ets:match_delete(?gvar_tab, #gvar{k = {DB, '_'}, _ = '_'}),
+    ok.
+
 %%================================================================================
 %% behavior callbacks
 %%================================================================================
@@ -96,6 +123,7 @@ init(?top) ->
         type => supervisor,
         shutdown => infinity
     },
+    ets:new(?gvar_tab, [named_table, set, public, {keypos, #gvar.k}, {read_concurrency, true}]),
     %%
     SupFlags = #{
         strategy => one_for_all,

+ 1 - 0
apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl

@@ -363,6 +363,7 @@ shard_of_message(DB, #message{from = From, topic = Topic}, SerializeBy) ->
         end,
     integer_to_binary(Hash).
 
+-spec foreach_shard(emqx_ds:db(), fun((shard_id()) -> _)) -> ok.
 foreach_shard(DB, Fun) ->
     lists:foreach(Fun, list_shards(DB)).