Procházet zdrojové kódy

fix(dsrepl): make db + shard part of machine state

It doesn't feel right, but right now is the easiest way to have it
in the scope of `apply/3`, because `init/1` doesn't actually invoked
for ra machines recovered from the existing log / snapshot.
Andrew Mayorov před 2 roky
rodič
revize
4dafbf21f6

+ 9 - 10
apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl

@@ -574,8 +574,7 @@ ra_drop_shard(DB, Shard) ->
 %%
 %%
 
 
 init(#{db := DB, shard := Shard}) ->
 init(#{db := DB, shard := Shard}) ->
-    _ = erlang:put(emqx_ds_db_shard, {DB, Shard}),
-    #{latest => 0}.
+    #{db_shard => {DB, Shard}, latest => 0}.
 
 
 apply(
 apply(
     #{index := RaftIdx},
     #{index := RaftIdx},
@@ -583,7 +582,7 @@ apply(
         ?tag := ?BATCH,
         ?tag := ?BATCH,
         ?batch_messages := MessagesIn
         ?batch_messages := MessagesIn
     },
     },
-    #{latest := Latest} = State
+    #{db_shard := DBShard, latest := Latest} = State
 ) ->
 ) ->
     %% NOTE
     %% NOTE
     %% Unique timestamp tracking real time closely.
     %% Unique timestamp tracking real time closely.
@@ -597,7 +596,7 @@ apply(
     %% currently relies on wall clock time to decide if it's safe to iterate over
     %% currently relies on wall clock time to decide if it's safe to iterate over
     %% next epoch, this is likely wrong. Ideally it should rely on consensus clock
     %% next epoch, this is likely wrong. Ideally it should rely on consensus clock
     %% time instead.
     %% time instead.
-    Result = emqx_ds_storage_layer:store_batch(erlang:get(emqx_ds_db_shard), Messages, #{}),
+    Result = emqx_ds_storage_layer:store_batch(DBShard, Messages, #{}),
     NState = State#{latest := NLatest},
     NState = State#{latest := NLatest},
     %% TODO: Need to measure effects of changing frequency of `release_cursor`.
     %% TODO: Need to measure effects of changing frequency of `release_cursor`.
     Effect = {release_cursor, RaftIdx, NState},
     Effect = {release_cursor, RaftIdx, NState},
@@ -605,23 +604,23 @@ apply(
 apply(
 apply(
     _RaftMeta,
     _RaftMeta,
     #{?tag := add_generation},
     #{?tag := add_generation},
-    State
+    #{db_shard := DBShard} = State
 ) ->
 ) ->
-    Result = emqx_ds_storage_layer:add_generation(erlang:get(emqx_ds_db_shard)),
+    Result = emqx_ds_storage_layer:add_generation(DBShard),
     {State, Result};
     {State, Result};
 apply(
 apply(
     _RaftMeta,
     _RaftMeta,
     #{?tag := update_config, ?config := Opts},
     #{?tag := update_config, ?config := Opts},
-    State
+    #{db_shard := DBShard} = State
 ) ->
 ) ->
-    Result = emqx_ds_storage_layer:update_config(erlang:get(emqx_ds_db_shard), Opts),
+    Result = emqx_ds_storage_layer:update_config(DBShard, Opts),
     {State, Result};
     {State, Result};
 apply(
 apply(
     _RaftMeta,
     _RaftMeta,
     #{?tag := drop_generation, ?generation := GenId},
     #{?tag := drop_generation, ?generation := GenId},
-    State
+    #{db_shard := DBShard} = State
 ) ->
 ) ->
-    Result = emqx_ds_storage_layer:drop_generation(erlang:get(emqx_ds_db_shard), GenId),
+    Result = emqx_ds_storage_layer:drop_generation(DBShard, GenId),
     {State, Result}.
     {State, Result}.
 
 
 assign_timestamps(Latest, Messages) ->
 assign_timestamps(Latest, Messages) ->