Parcourir la source

fix(dsrepl): make Raft index tracking more robust

Essentially by storing out of generations, in a separate "globals"
namespace in the default column family.
Andrew Mayorov il y a 1 an
Parent
commit
e0c3075ca4

+ 18 - 50
apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl

@@ -993,30 +993,26 @@ apply(
                         Error
                         Error
                 end
                 end
         end,
         end,
+    %% Always advance latest timestamp nonetheless, so it won't diverge on replay.
+    State = State0#{latest := Latest},
+    set_ts(DBShard, Latest),
     case Admission of
     case Admission of
         true ->
         true ->
             %% Plain batch, no preconditions.
             %% Plain batch, no preconditions.
             Result = store_batch_nondurable(DBShard, Operations),
             Result = store_batch_nondurable(DBShard, Operations),
-            State = State0#{latest := Latest},
-            set_ts(DBShard, Latest),
             Effects = try_release_log(Stats, RaftMeta, State);
             Effects = try_release_log(Stats, RaftMeta, State);
         ok ->
         ok ->
             %% Preconditions succeeded, need to persist `Latest` in the storage layer.
             %% Preconditions succeeded, need to persist `Latest` in the storage layer.
-            UpdateOps = mk_update_storage_raidx(DBShard, Latest, RaftIdx),
-            Result = store_batch_nondurable(DBShard, UpdateOps ++ Operations),
-            State = State0#{latest := Latest},
-            set_ts(DBShard, Latest),
+            Result = store_batch_nondurable(DBShard, Operations),
+            Result == ok andalso update_storage_raidx(DBShard, RaftIdx),
             Effects = try_release_log(Stats, RaftMeta, State);
             Effects = try_release_log(Stats, RaftMeta, State);
         Result = false ->
         Result = false ->
             %% There are preconditions, but the batch looks outdated. Skip the batch.
             %% There are preconditions, but the batch looks outdated. Skip the batch.
             %% This is log replay, reply with `false`, noone expects the reply anyway.
             %% This is log replay, reply with `false`, noone expects the reply anyway.
-            State = State0#{latest := Latest},
-            set_ts(DBShard, Latest),
             Effects = [];
             Effects = [];
         PreconditionFailed = {precondition_failed, _} ->
         PreconditionFailed = {precondition_failed, _} ->
             %% Preconditions failed. Skip the batch.
             %% Preconditions failed. Skip the batch.
             Result = {error, unrecoverable, PreconditionFailed},
             Result = {error, unrecoverable, PreconditionFailed},
-            State = State0,
             Effects = [];
             Effects = [];
         Result = {error, unrecoverable, Reason} ->
         Result = {error, unrecoverable, Reason} ->
             ?tp(error, "emqx_ds_replication_apply_batch_failed", #{
             ?tp(error, "emqx_ds_replication_apply_batch_failed", #{
@@ -1027,13 +1023,12 @@ apply(
                     "Unrecoverable error storing committed batch. Replicas may diverge. "
                     "Unrecoverable error storing committed batch. Replicas may diverge. "
                     "Consider rebuilding this shard replica from scratch."
                     "Consider rebuilding this shard replica from scratch."
             }),
             }),
-            State = State0,
             Effects = []
             Effects = []
     end,
     end,
     Effects =/= [] andalso ?tp(ds_ra_effects, #{effects => Effects, meta => RaftMeta}),
     Effects =/= [] andalso ?tp(ds_ra_effects, #{effects => Effects, meta => RaftMeta}),
     {State, Result, Effects};
     {State, Result, Effects};
 apply(
 apply(
-    RaftMeta = #{index := RaftIdx},
+    RaftMeta,
     #{?tag := add_generation, ?since := Since},
     #{?tag := add_generation, ?since := Since},
     #{db_shard := DBShard, latest := Latest0} = State0
     #{db_shard := DBShard, latest := Latest0} = State0
 ) ->
 ) ->
@@ -1047,15 +1042,13 @@ apply(
     ),
     ),
     {Timestamp, Latest} = ensure_monotonic_timestamp(Since, Latest0),
     {Timestamp, Latest} = ensure_monotonic_timestamp(Since, Latest0),
     Result = emqx_ds_storage_layer:add_generation(DBShard, Timestamp),
     Result = emqx_ds_storage_layer:add_generation(DBShard, Timestamp),
-    UpdateOps = mk_update_storage_raidx(DBShard, Latest, RaftIdx),
-    _Ok = emqx_ds_storage_layer:store_batch(DBShard, UpdateOps, #{durable => false}),
     State = State0#{latest := Latest},
     State = State0#{latest := Latest},
     set_ts(DBShard, Latest),
     set_ts(DBShard, Latest),
     Effects = release_log(RaftMeta, State),
     Effects = release_log(RaftMeta, State),
     Effects =/= [] andalso ?tp(ds_ra_effects, #{effects => Effects, meta => RaftMeta}),
     Effects =/= [] andalso ?tp(ds_ra_effects, #{effects => Effects, meta => RaftMeta}),
     {State, Result, Effects};
     {State, Result, Effects};
 apply(
 apply(
-    RaftMeta = #{index := RaftIdx},
+    RaftMeta,
     #{?tag := update_config, ?since := Since, ?config := Opts},
     #{?tag := update_config, ?since := Since, ?config := Opts},
     #{db_shard := DBShard, latest := Latest0} = State0
     #{db_shard := DBShard, latest := Latest0} = State0
 ) ->
 ) ->
@@ -1070,8 +1063,6 @@ apply(
     ),
     ),
     {Timestamp, Latest} = ensure_monotonic_timestamp(Since, Latest0),
     {Timestamp, Latest} = ensure_monotonic_timestamp(Since, Latest0),
     Result = emqx_ds_storage_layer:update_config(DBShard, Timestamp, Opts),
     Result = emqx_ds_storage_layer:update_config(DBShard, Timestamp, Opts),
-    UpdateOps = mk_update_storage_raidx(DBShard, Latest, RaftIdx),
-    _Ok = emqx_ds_storage_layer:store_batch(DBShard, UpdateOps, #{durable => false}),
     State = State0#{latest := Latest},
     State = State0#{latest := Latest},
     Effects = release_log(RaftMeta, State),
     Effects = release_log(RaftMeta, State),
     Effects =/= [] andalso ?tp(ds_ra_effects, #{effects => Effects, meta => RaftMeta}),
     Effects =/= [] andalso ?tp(ds_ra_effects, #{effects => Effects, meta => RaftMeta}),
@@ -1131,48 +1122,25 @@ store_batch_nondurable(DBShard = {DB, Shard}, Operations) ->
 
 
 %% Latest Raft index tracking
 %% Latest Raft index tracking
 %%
 %%
-%% Latest RaIdx is kept in a special, invisible from outside topic, at the timestamp
-%% matching the start of the latest storage generation. Each update goes to the same
-%% spot in the RocksDB, but this should not be a problem: writes are non-durable and
-%% issued only when preconditions are involved.
+%% Latest RaIdx is kept in a global, basically a KV pair in the default column family.
+%% Each update goes to the same spot in the RocksDB, but this should not be a problem:
+%% writes are non-durable and issued only when preconditions are involved.
 
 
--define(DSREPL_TOPIC(ENTRY), <<"$", 0:8, "/", (ENTRY):8>>).
--define(DSREPL_LATEST, 1).
+-define(DSREPL_RAFTIDX, <<"dsrepl/ri">>).
 
 
 read_storage_raidx(DBShard) ->
 read_storage_raidx(DBShard) ->
-    {_Id, #{since := Since}} = emqx_ds_storage_layer:find_generation(DBShard, current),
-    Matcher = #message_matcher{
-        from = <<>>,
-        topic = ?DSREPL_TOPIC(?DSREPL_LATEST),
-        timestamp = Since,
-        payload = '_'
-    },
-    case emqx_ds_storage_layer:lookup_message(DBShard, Matcher) of
-        #message{payload = Payload} when byte_size(Payload) =< 8 ->
-            {ok, binary:decode_unsigned(Payload)};
+    case emqx_ds_storage_layer:fetch_global(DBShard, ?DSREPL_RAFTIDX) of
+        {ok, V} when byte_size(V) =< 8 ->
+            {ok, binary:decode_unsigned(V)};
         not_found ->
         not_found ->
             {ok, 0};
             {ok, 0};
-        #message{payload = Payload} ->
-            {error, unrecoverable, {unexpected, Payload}};
-        {error, unrecoverable, _Reason} = Error ->
+        Error ->
             Error
             Error
     end.
     end.
 
 
-mk_update_storage_raidx(DBShard, Latest, RaIdx) ->
-    case emqx_ds_storage_layer:find_generation(DBShard, Latest) of
-        {_Id, #{since := Since}} ->
-            [
-                {Since, #message{
-                    id = <<>>,
-                    from = <<>>,
-                    topic = ?DSREPL_TOPIC(?DSREPL_LATEST),
-                    timestamp = Since,
-                    payload = binary:encode_unsigned(RaIdx)
-                }}
-            ];
-        not_found ->
-            []
-    end.
+update_storage_raidx(DBShard, RaftIdx) ->
+    KV = #{?DSREPL_RAFTIDX => binary:encode_unsigned(RaftIdx)},
+    ok = emqx_ds_storage_layer:store_global(DBShard, KV, #{durable => false}).
 
 
 %% Log truncation
 %% Log truncation
 
 

+ 45 - 3
apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl

@@ -54,6 +54,10 @@
     drop_generation/2,
     drop_generation/2,
     find_generation/2,
     find_generation/2,
 
 
+    %% Globals
+    store_global/3,
+    fetch_global/2,
+
     %% Snapshotting
     %% Snapshotting
     flush/1,
     flush/1,
     take_snapshot/1,
     take_snapshot/1,
@@ -243,7 +247,9 @@
     %% This data is used to create new generation:
     %% This data is used to create new generation:
     prototype := prototype(),
     prototype := prototype(),
     %% Generations:
     %% Generations:
-    ?GEN_KEY(gen_id()) => GenData
+    ?GEN_KEY(gen_id()) => GenData,
+    %% DB handle (runtime only).
+    db => rocksdb:db_handle()
 }.
 }.
 
 
 %% Shard schema (persistent):
 %% Shard schema (persistent):
@@ -252,6 +258,8 @@
 %% Shard (runtime):
 %% Shard (runtime):
 -type shard() :: shard(generation()).
 -type shard() :: shard(generation()).
 
 
+-define(GLOBAL(K), <<"G/", K/binary>>).
+
 -type options() :: map().
 -type options() :: map().
 
 
 -type poll_iterators() :: [{_UserData, iterator()}].
 -type poll_iterators() :: [{_UserData, iterator()}].
@@ -677,6 +685,39 @@ delete_next(
             {ok, end_of_stream}
             {ok, end_of_stream}
     end.
     end.
 
 
+-spec store_global(shard_id(), _KVs :: #{binary() => binary()}, batch_store_opts()) ->
+    ok | emqx_ds:error(_).
+store_global(ShardId, KVs, Options) ->
+    #{db := DB} = get_schema_runtime(ShardId),
+    {ok, Batch} = rocksdb:batch(),
+    try
+        ok = maps:foreach(fun(K, V) -> rocksdb:batch_put(Batch, ?GLOBAL(K), V) end, KVs),
+        WriteOpts = [{disable_wal, not maps:get(durable, Options, true)}],
+        Result = rocksdb:write_batch(DB, Batch, WriteOpts),
+        case Result of
+            ok ->
+                ok;
+            {error, {error, Reason}} ->
+                {error, unrecoverable, {rocksdb, Reason}}
+        end
+    after
+        rocksdb:release_batch(Batch)
+    end.
+
+-spec fetch_global(shard_id(), _Key :: binary()) ->
+    {ok, _Value :: binary()} | not_found | emqx_ds:error(_).
+fetch_global(ShardId, K) ->
+    #{db := DB} = get_schema_runtime(ShardId),
+    Result = rocksdb:get(DB, ?GLOBAL(K), _ReadOpts = []),
+    case Result of
+        {ok, _} ->
+            Result;
+        not_found ->
+            Result;
+        {error, Reason} ->
+            {error, unrecoverable, {rocksdb, Reason}}
+    end.
+
 -spec update_config(shard_id(), emqx_ds:time(), emqx_ds:create_db_opts()) ->
 -spec update_config(shard_id(), emqx_ds:time(), emqx_ds:create_db_opts()) ->
     ok | {error, overlaps_existing_generations}.
     ok | {error, overlaps_existing_generations}.
 update_config(ShardId, Since, Options) ->
 update_config(ShardId, Since, Options) ->
@@ -921,7 +962,7 @@ clear_all_checkpoints(ShardId) ->
     shard().
     shard().
 open_shard(ShardId, DB, CFRefs, ShardSchema) ->
 open_shard(ShardId, DB, CFRefs, ShardSchema) ->
     %% Transform generation schemas to generation runtime data:
     %% Transform generation schemas to generation runtime data:
-    maps:map(
+    Shard = maps:map(
         fun
         fun
             (?GEN_KEY(GenId), GenSchema) ->
             (?GEN_KEY(GenId), GenSchema) ->
                 open_generation(ShardId, DB, CFRefs, GenId, GenSchema);
                 open_generation(ShardId, DB, CFRefs, GenId, GenSchema);
@@ -929,7 +970,8 @@ open_shard(ShardId, DB, CFRefs, ShardSchema) ->
                 Val
                 Val
         end,
         end,
         ShardSchema
         ShardSchema
-    ).
+    ),
+    Shard#{db => DB}.
 
 
 -spec handle_add_generation(server_state(), emqx_ds:time()) ->
 -spec handle_add_generation(server_state(), emqx_ds:time()) ->
     server_state() | {error, overlaps_existing_generations}.
     server_state() | {error, overlaps_existing_generations}.