Prechádzať zdrojové kódy

Merge pull request #14229 from keynslug/fix/EMQX-12848/raft-precond-idemp

fix(dsrepl): ensure preconditions are idemotent under log replays
Andrew Mayorov 1 rok pred
rodič
commit
c2b65ced3f

+ 1 - 1
apps/emqx_ds_builtin_raft/src/emqx_ds_builtin_raft.app.src

@@ -2,7 +2,7 @@
 {application, emqx_ds_builtin_raft, [
     {description, "Raft replication layer for the durable storage"},
     % strict semver, bump manually!
-    {vsn, "0.2.1"},
+    {vsn, "0.2.2"},
     {modules, []},
     {registered, []},
     {applications, [kernel, stdlib, gproc, mria, ra, emqx_durable_storage]},

+ 123 - 21
apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl

@@ -14,6 +14,7 @@
     open_db/2,
     close_db/1,
     add_generation/1,
+    add_generation/2,
     update_db_config/2,
     list_generations_with_lifetimes/1,
     drop_generation/2,
@@ -212,17 +213,22 @@ close_db(DB) ->
 
 -spec add_generation(emqx_ds:db()) -> ok | {error, _}.
 add_generation(DB) ->
+    add_generation(DB, emqx_ds:timestamp_us()).
+
+-spec add_generation(emqx_ds:db(), emqx_ds:time()) -> ok | {error, _}.
+add_generation(DB, Since) ->
     foreach_shard(
         DB,
-        fun(Shard) -> ok = ra_add_generation(DB, Shard) end
+        fun(Shard) -> ok = ra_add_generation(DB, Shard, Since) end
     ).
 
 -spec update_db_config(emqx_ds:db(), builtin_db_opts()) -> ok | {error, _}.
 update_db_config(DB, CreateOpts) ->
     Opts = #{} = emqx_ds_replication_layer_meta:update_db_config(DB, CreateOpts),
+    Since = emqx_ds:timestamp_us(),
     foreach_shard(
         DB,
-        fun(Shard) -> ok = ra_update_config(DB, Shard, Opts) end
+        fun(Shard) -> ok = ra_update_config(DB, Shard, Opts, Since) end
     ).
 
 -spec list_generations_with_lifetimes(emqx_ds:db()) ->
@@ -780,18 +786,18 @@ ra_store_batch(DB, Shard, Batch) ->
             {error, recoverable, Reason}
     end.
 
-ra_add_generation(DB, Shard) ->
+ra_add_generation(DB, Shard, Since) ->
     Command = #{
         ?tag => add_generation,
-        ?since => emqx_ds:timestamp_us()
+        ?since => Since
     },
     ra_command(DB, Shard, Command, 10).
 
-ra_update_config(DB, Shard, Opts) ->
+ra_update_config(DB, Shard, Opts, Since) ->
     Command = #{
         ?tag => update_config,
         ?config => Opts,
-        ?since => emqx_ds:timestamp_us()
+        ?since => Since
     },
     ra_command(DB, Shard, Command, 10).
 
@@ -910,10 +916,44 @@ ra_list_generations_with_lifetimes(DB, Shard) ->
 ra_drop_shard(DB, Shard) ->
     ra:delete_cluster(emqx_ds_replication_layer_shard:shard_servers(DB, Shard), ?RA_TIMEOUT).
 
+%% Ra Machine implementation
+%%
+%% This code decides how successfully replicated and committed log entries (e.g.
+%% commands) are applied to the shard storage state. This state is actually comprised
+%% logically of 2 parts:
+%% 1. RocksDB database managed through `emqx_ds_storage_layer`.
+%% 2. Machine state (`ra_state()`) that holds very minimal state needed to ensure
+%%    higher-level semantics, most importantly strictly monotonic quasi-wallclock
+%%    timestamp used to assign unique message timestamps to fulfill "append-only"
+%%    guarantees.
+%%
+%% There are few subtleties in how storage state is persisted and recovered.
+%% When the shard recovers from a shutdown or crash, this is what usually happens:
+%% 1. Shard storage layer starts up the RocksDB database.
+%% 2. Ra recovers the Raft log.
+%% 3. Ra recovers the latest machine snapshot (`ra_state()`), taken at some point
+%%    in time (`RaftIdx`).
+%% 4. Ra applies existing Raft log entries starting from `RaftIdx`.
+%%
+%% While most of the time storage layer state, machine snapshot and log entries are
+%% consistent with each other, there are situations when they are not. Namely:
+%%  * RocksDB decides to flush memtables to disk by itself, which is unexpected but
+%%    possible.
+%%  * Lagging replica accepts a storage snapshot sourced from a RocksDB checkpoint,
+%%    and RocksDB database is always implicitly flushed before checkpointing.
+%% In both of those cases, the Raft log would contain entries that were already
+%% applied from the point of view of the storage layer, and we must anticipate that.
 %%
+%% The process running Ra machine also keeps auxiliary ephemeral state in the process
+%% dictionary, see `?pd_ra_*` macrodefs for details.
 
+%% Index of the last yet unreleased Ra log entry.
 -define(pd_ra_idx_need_release, '$emqx_ds_raft_idx_need_release').
+
+%% Approximate number of bytes occupied by yet unreleased Ra log entries.
 -define(pd_ra_bytes_need_release, '$emqx_ds_raft_bytes_need_release').
+
+%% Cached value of the `append_only` DS DB configuration setting.
 -define(pd_ra_force_monotonic, '$emqx_ds_raft_force_monotonic').
 
 -spec init(_Args :: map()) -> ra_state().
@@ -923,7 +963,7 @@ init(#{db := DB, shard := Shard}) ->
 -spec apply(ra_machine:command_meta_data(), ra_command(), ra_state()) ->
     {ra_state(), _Reply, _Effects}.
 apply(
-    RaftMeta,
+    RaftMeta = #{index := RaftIdx},
     Command = #{
         ?tag := ?BATCH,
         ?batch_operations := OperationsIn
@@ -931,26 +971,58 @@ apply(
     #{db_shard := DBShard = {DB, Shard}, latest := Latest0} = State0
 ) ->
     ?tp(ds_ra_apply_batch, #{db => DB, shard => Shard, batch => OperationsIn, latest => Latest0}),
-    Preconditions = maps:get(?batch_preconditions, Command, []),
     {Stats, Latest, Operations} = assign_timestamps(DB, Latest0, OperationsIn),
-    DispatchF = fun(Events) ->
-        emqx_ds_beamformer:shard_event({DB, Shard}, Events)
-    end,
-    %% FIXME
-    case emqx_ds_precondition:verify(emqx_ds_storage_layer, DBShard, Preconditions) of
+    Preconditions = maps:get(?batch_preconditions, Command, []),
+    Admission =
+        case Preconditions of
+            [] ->
+                %% No preconditions.
+                true;
+            _ ->
+                %% Since preconditions are part of the Ra log, we need to be sure they
+                %% are applied perfectly idempotently, even when Ra log entries are
+                %% replayed on top of a more recent storage state that already had them
+                %% evaluated and applied before.
+                case read_storage_raidx(DBShard) of
+                    {ok, SRI} when RaftIdx > SRI ->
+                        emqx_ds_precondition:verify(emqx_ds_storage_layer, DBShard, Preconditions);
+                    {ok, SRI} when SRI >= RaftIdx ->
+                        %% This batch looks outdated relative to the storage layer state.
+                        false;
+                    {error, _, _} = Error ->
+                        Error
+                end
+        end,
+    %% Always advance latest timestamp nonetheless, so it won't diverge on replay.
+    State = State0#{latest := Latest},
+    set_ts(DBShard, Latest),
+    case Admission of
+        true ->
+            %% Plain batch, no preconditions.
+            Result = store_batch_nondurable(DBShard, Operations),
+            Effects = try_release_log(Stats, RaftMeta, State);
         ok ->
-            Result = emqx_ds_storage_layer:store_batch(
-                DBShard, Operations, #{durable => false}, DispatchF
-            ),
-            State = State0#{latest := Latest},
-            set_ts(DBShard, Latest),
+            %% Preconditions succeeded, need to persist `Latest` in the storage layer.
+            Result = store_batch_nondurable(DBShard, Operations),
+            Result == ok andalso update_storage_raidx(DBShard, RaftIdx),
             Effects = try_release_log(Stats, RaftMeta, State);
+        Result = false ->
+            %% There are preconditions, but the batch looks outdated. Skip the batch.
+            %% This is log replay, reply with `false`, noone expects the reply anyway.
+            Effects = [];
         PreconditionFailed = {precondition_failed, _} ->
+            %% Preconditions failed. Skip the batch.
             Result = {error, unrecoverable, PreconditionFailed},
-            State = State0,
             Effects = [];
-        Result ->
-            State = State0,
+        Result = {error, unrecoverable, Reason} ->
+            ?tp(error, "emqx_ds_replication_apply_batch_failed", #{
+                db => DB,
+                shard => Shard,
+                reason => Reason,
+                details =>
+                    "Unrecoverable error storing committed batch. Replicas may diverge. "
+                    "Consider rebuilding this shard replica from scratch."
+            }),
             Effects = []
     end,
     Effects =/= [] andalso ?tp(ds_ra_effects, #{effects => Effects, meta => RaftMeta}),
@@ -1042,6 +1114,36 @@ force_monotonic_timestamps(DB) ->
     end,
     Flag.
 
+store_batch_nondurable(DBShard = {DB, Shard}, Operations) ->
+    DispatchF = fun(Events) ->
+        emqx_ds_beamformer:shard_event({DB, Shard}, Events)
+    end,
+    emqx_ds_storage_layer:store_batch(DBShard, Operations, #{durable => false}, DispatchF).
+
+%% Latest Raft index tracking
+%%
+%% Latest RaIdx is kept in a global, basically a KV pair in the default column family.
+%% Each update goes to the same spot in the RocksDB, but this should not be a problem:
+%% writes are non-durable and issued only when preconditions are involved.
+
+-define(DSREPL_RAFTIDX, <<"dsrepl/ri">>).
+
+read_storage_raidx(DBShard) ->
+    case emqx_ds_storage_layer:fetch_global(DBShard, ?DSREPL_RAFTIDX) of
+        {ok, V} when byte_size(V) =< 8 ->
+            {ok, binary:decode_unsigned(V)};
+        not_found ->
+            {ok, 0};
+        Error ->
+            Error
+    end.
+
+update_storage_raidx(DBShard, RaftIdx) ->
+    KV = #{?DSREPL_RAFTIDX => binary:encode_unsigned(RaftIdx)},
+    ok = emqx_ds_storage_layer:store_global(DBShard, KV, #{durable => false}).
+
+%% Log truncation
+
 try_release_log({_N, BatchSize}, RaftMeta = #{index := CurrentIdx}, State) ->
     %% NOTE
     %% Because cursor release means storage flush (see

+ 135 - 1
apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl

@@ -19,6 +19,7 @@
 -compile(nowarn_export_all).
 
 -include("../../emqx/include/emqx.hrl").
+-include_lib("emqx_durable_storage/include/emqx_ds.hrl").
 -include_lib("common_test/include/ct.hrl").
 -include_lib("stdlib/include/assert.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
@@ -31,7 +32,7 @@
 
 opts(Config, Overrides) ->
     Layout = ?config(layout, Config),
-    maps:merge(
+    emqx_utils_maps:deep_merge(
         #{
             backend => builtin_raft,
             storage => Layout,
@@ -178,6 +179,139 @@ t_replication_transfers_snapshots(Config) ->
         []
     ).
 
+t_preconditions_idempotent(init, Config) ->
+    Apps = [appspec(ra), appspec(emqx_durable_storage), appspec(emqx_ds_builtin_raft)],
+    Specs = emqx_cth_cluster:mk_nodespecs(
+        [
+            {t_preconditions_idempotent1, #{apps => Apps}},
+            {t_preconditions_idempotent2, #{apps => Apps}}
+        ],
+        #{work_dir => ?config(work_dir, Config)}
+    ),
+    Nodes = emqx_cth_cluster:start(Specs),
+    [{nodes, Nodes}, {specs, Specs} | Config];
+t_preconditions_idempotent('end', Config) ->
+    ok = emqx_cth_cluster:stop(?config(nodes, Config)).
+
+t_preconditions_idempotent(Config) ->
+    C1 = <<"C1">>,
+    Topic1 = <<"t/foo">>,
+    Topic2 = <<"t/bar/xyz">>,
+    Messages = [
+        message(C1, Topic1, <<"M0">>, 0),
+        message(C1, Topic2, <<"M0">>, 0),
+        message(C1, Topic1, <<"M1">>, 1),
+        message(C1, Topic2, <<"M1">>, 1),
+        message(C1, Topic1, <<"M2">>, 2),
+        message(C1, Topic2, <<"M2">>, 2),
+        message(C1, Topic1, <<"M100">>, 100)
+    ],
+    Batch1 = [
+        message(C1, Topic2, <<"M200">>, 200),
+        message(C1, Topic1, <<"M300">>, 300)
+    ],
+    Since1 = 350,
+    Since2 = 600,
+    Batch2 = #dsbatch{
+        preconditions = [
+            {if_exists, #message_matcher{from = C1, topic = Topic2, timestamp = 400, payload = '_'}}
+        ],
+        operations = [
+            message(C1, Topic1, <<"M5">>, 500)
+        ]
+    },
+    Batch3 = #dsbatch{
+        preconditions = [
+            {if_exists, #message_matcher{from = C1, topic = Topic1, timestamp = 100, payload = '_'}}
+        ],
+        operations = [
+            message(C1, Topic2, <<"M4">>, 400)
+        ]
+    },
+
+    Nodes = [N1, N2] = ?config(nodes, Config),
+    _Specs = [NS1, _] = ?config(specs, Config),
+    Opts = opts(Config, #{
+        n_shards => 1,
+        n_sites => 2,
+        replication_factor => 3,
+        append_only => false,
+        replication_options => #{
+            %% Make sure snapshots are taken eagerly.
+            snapshot_interval => 4
+        }
+    }),
+    ?check_trace(
+        #{timetrap => 30_000},
+        begin
+            assert_db_open(Nodes, ?DB, Opts),
+
+            %% Store several messages.
+            [ok = ?ON(N2, emqx_ds:store_batch(?DB, [M], #{sync => true})) || M <- Messages],
+            ?assertEqual(
+                ok,
+                ?ON(N2, emqx_ds:store_batch(?DB, Batch1, #{sync => true}))
+            ),
+
+            %% Add a generation. This will cause the storage layer to flush.
+            ok = ?ON(N2, emqx_ds_replication_layer:add_generation(?DB, Since1)),
+
+            %% Store batches with preconditions.
+            ?assertMatch(
+                %% No `{Topic2, _TS = 400}` message yet, should fail.
+                {error, _, {precondition_failed, _}},
+                ?ON(N2, emqx_ds:store_batch(?DB, Batch2, #{sync => true}))
+            ),
+            ?assertEqual(
+                %% Only now `{Topic2, _TS = 400}` should be stored.
+                ok,
+                ?ON(N2, emqx_ds:store_batch(?DB, Batch3, #{sync => true}))
+            ),
+
+            %% Restart N1 and wait until it is ready.
+            [N1] = emqx_cth_cluster:restart(NS1),
+            RestartedAt1 = erlang:monotonic_time(millisecond),
+            ok = ?ON(N1, emqx_ds:open_db(?DB, Opts)),
+            SinceRestarted1 = erlang:monotonic_time(millisecond) - RestartedAt1,
+            wait_db_bootstrapped([N1], ?DB, infinity, SinceRestarted1),
+
+            %% Both replicas should still contain the same set of messages.
+            [N1Msgs1, N2Msgs1] = ?ON(
+                Nodes,
+                emqx_ds_test_helpers:storage_consume({?DB, <<"0">>}, ['#'])
+            ),
+            emqx_ds_test_helpers:assert_same_set(N1Msgs1, N2Msgs1),
+
+            %% Add one more generation, idempotency should still hold if it's
+            %% the last log entry.
+            ok = ?ON(N2, emqx_ds_replication_layer:add_generation(?DB, Since2)),
+
+            %% Restart N1 and wait until it is ready.
+            [N1] = emqx_cth_cluster:restart(NS1),
+            RestartedAt2 = erlang:monotonic_time(millisecond),
+            ok = ?ON(N1, emqx_ds:open_db(?DB, Opts)),
+            SinceRestarted2 = erlang:monotonic_time(millisecond) - RestartedAt2,
+            wait_db_bootstrapped([N1], ?DB, infinity, SinceRestarted2),
+
+            %% But both replicas should still contain the same set of messages.
+            [N1Msgs2, N2Msgs2] = ?ON(
+                Nodes,
+                emqx_ds_test_helpers:storage_consume({?DB, <<"0">>}, ['#'])
+            ),
+            emqx_ds_test_helpers:assert_same_set(N1Msgs2, N2Msgs2)
+        end,
+        fun(Trace) ->
+            %% Expect Raft log entries following `add_generation/2` to be reapplied
+            %% twice, once per each restart.
+            Events = ?of_kind(ds_ra_apply_batch, ?of_node(N1, Trace)),
+            ?assertMatch(
+                % Batch2, Batch3, Batch2, Batch3, Batch2, Batch3
+                [_, _, _, _, _, _],
+                [E || E = #{latest := L} <- Events, L > Since1]
+            )
+        end
+    ).
+
 t_rebalance(init, Config) ->
     Apps = [appspec(ra), appspec(emqx_durable_storage), appspec(emqx_ds_builtin_raft)],
     Nodes = emqx_cth_cluster:start(

+ 4 - 0
apps/emqx_durable_storage/src/emqx_ds_lts.erl

@@ -245,6 +245,10 @@ topic_key(Trie, ThresholdFun, Tokens) ->
 
 %% @doc Return an exisiting topic key if it exists.
 -spec lookup_topic_key(trie(), [level()]) -> {ok, msg_storage_key()} | undefined.
+lookup_topic_key(Trie, [<<"$", _/bytes>> | _] = Tokens) ->
+    %% [MQTT-4.7.2-1]
+    %% See also `match_topics/2`.
+    do_lookup_topic_key(Trie, ?PREFIX_SPECIAL, Tokens, []);
 lookup_topic_key(Trie, Tokens) ->
     do_lookup_topic_key(Trie, ?PREFIX, Tokens, []).
 

+ 35 - 11
apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl

@@ -607,12 +607,14 @@ lookup_message(
     S = #s{db = DB, data = CF},
     #message_matcher{topic = Topic, timestamp = Timestamp}
 ) ->
-    {Key, _} = make_key(S, Timestamp, Topic),
-    case rocksdb:get(DB, CF, Key, _ReadOpts = []) of
+    Key = lookup_key(S, Timestamp, Topic),
+    case is_binary(Key) andalso rocksdb:get(DB, CF, Key, _ReadOpts = []) of
         {ok, Blob} ->
             deserialize(Blob);
         not_found ->
             not_found;
+        false ->
+            not_found;
         Error ->
             {error, unrecoverable, {rocksdb, Error}}
     end.
@@ -921,21 +923,43 @@ format_key(KeyMapper, Key) ->
 make_key(#s{keymappers = KeyMappers, trie = Trie, threshold_fun = TFun}, Timestamp, Topic) ->
     Tokens = emqx_topic:words(Topic),
     {TopicIndex, Varying} = emqx_ds_lts:topic_key(Trie, TFun, Tokens),
+    KeyBin = make_key(KeyMappers, Timestamp, TopicIndex, Varying),
+    {KeyBin, Varying}.
+
+-spec lookup_key(s(), emqx_ds:time(), emqx_types:topic()) -> binary() | undefined.
+lookup_key(#s{keymappers = KeyMappers, trie = Trie}, Timestamp, Topic) ->
+    Tokens = emqx_topic:words(Topic),
+    case emqx_ds_lts:lookup_topic_key(Trie, Tokens) of
+        {ok, {TopicIndex, Varying}} ->
+            make_key(KeyMappers, Timestamp, TopicIndex, Varying);
+        undefined ->
+            undefined
+    end.
+
+-spec make_key(
+    array:array(emqx_ds_bitmask_keymapper:keymapper()),
+    emqx_ds:time(),
+    emqx_ds_lts:static_key(),
+    [emqx_ds_lts:varying()]
+) ->
+    binary().
+make_key(KeyMappers, Timestamp, TopicIndex, Varying) ->
     VaryingHashes = [hash_topic_level(I) || I <- Varying],
     KeyMapper = array:get(length(Varying), KeyMappers),
-    KeyBin = make_key(KeyMapper, TopicIndex, Timestamp, VaryingHashes),
-    {KeyBin, Varying}.
+    map_key(KeyMapper, TopicIndex, Timestamp, VaryingHashes).
 
--spec make_key(emqx_ds_bitmask_keymapper:keymapper(), emqx_ds_lts:static_key(), emqx_ds:time(), [
-    non_neg_integer()
-]) ->
+-spec map_key(
+    emqx_ds_bitmask_keymapper:keymapper(),
+    emqx_ds_lts:static_key(),
+    emqx_ds:time(),
+    [non_neg_integer()]
+) ->
     binary().
-make_key(KeyMapper, TopicIndex, Timestamp, Varying) ->
+map_key(KeyMapper, TopicIndex, Timestamp, Varying) ->
+    Vector = [TopicIndex, Timestamp | Varying],
     emqx_ds_bitmask_keymapper:key_to_bitstring(
         KeyMapper,
-        emqx_ds_bitmask_keymapper:vector_to_key(KeyMapper, [
-            TopicIndex, Timestamp | Varying
-        ])
+        emqx_ds_bitmask_keymapper:vector_to_key(KeyMapper, Vector)
     ).
 
 hash_topic_level('') ->

+ 133 - 12
apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl

@@ -52,6 +52,11 @@
     add_generation/2,
     list_generations_with_lifetimes/1,
     drop_generation/2,
+    find_generation/2,
+
+    %% Globals
+    store_global/3,
+    fetch_global/2,
 
     %% Snapshotting
     flush/1,
@@ -63,7 +68,15 @@
 ]).
 
 %% gen_server
--export([init/1, format_status/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
+-export([
+    init/1,
+    format_status/1,
+    handle_continue/2,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    terminate/2
+]).
 
 %% internal exports:
 -export([db_dir/1, base_dir/0]).
@@ -128,6 +141,12 @@
 -type cf_refs() :: [cf_ref()].
 
 -type gen_id() :: 0..16#ffff.
+-type gen_info() :: #{
+    created_at := emqx_ds:time(),
+    since := emqx_ds:time(),
+    until := undefined | emqx_ds:time(),
+    _ => _
+}.
 
 -type batch() :: [
     {emqx_ds:time(), emqx_types:message()}
@@ -228,7 +247,9 @@
     %% This data is used to create new generation:
     prototype := prototype(),
     %% Generations:
-    ?GEN_KEY(gen_id()) => GenData
+    ?GEN_KEY(gen_id()) => GenData,
+    %% DB handle (runtime only).
+    db => rocksdb:db_handle()
 }.
 
 %% Shard schema (persistent):
@@ -237,6 +258,8 @@
 %% Shard (runtime):
 -type shard() :: shard(generation()).
 
+-define(GLOBAL(K), <<"G/", K/binary>>).
+
 -type options() :: map().
 
 -type poll_iterators() :: [{_UserData, iterator()}].
@@ -662,6 +685,54 @@ delete_next(
             {ok, end_of_stream}
     end.
 
+%% @doc Persist a bunch of key/value pairs in the storage globally, in the "outside
+%% of specific generation" sense. Once persisted, values can be read back by calling
+%% `fetch_global/2`.
+%%
+%% Adding or dropping generations won't affect persisted key/value pairs, hence the
+%% purpose: keep state that needs to be tied to the shard itself and outlive any
+%% generation.
+%%
+%% This operation is idempotent, previous values associated with existing keys are
+%% overwritten. While atomicity of the operation can be specifically requested through
+%% `atomic` option, it is atomic by default: either all of pairs are persisted, or none
+%% at all. Writes are durable by default, but this is optional, see `batch_store_opts()`
+%% for details.
+-spec store_global(shard_id(), _KVs :: #{binary() => binary()}, batch_store_opts()) ->
+    ok | emqx_ds:error(_).
+store_global(ShardId, KVs, Options) ->
+    #{db := DB} = get_schema_runtime(ShardId),
+    {ok, Batch} = rocksdb:batch(),
+    try
+        ok = maps:foreach(fun(K, V) -> rocksdb:batch_put(Batch, ?GLOBAL(K), V) end, KVs),
+        WriteOpts = [{disable_wal, not maps:get(durable, Options, true)}],
+        Result = rocksdb:write_batch(DB, Batch, WriteOpts),
+        case Result of
+            ok ->
+                ok;
+            {error, {error, Reason}} ->
+                {error, unrecoverable, {rocksdb, Reason}}
+        end
+    after
+        rocksdb:release_batch(Batch)
+    end.
+
+%% @doc Retrieve a value for a single key from the storage written there previously by
+%% `store_global/3`.
+-spec fetch_global(shard_id(), _Key :: binary()) ->
+    {ok, _Value :: binary()} | not_found | emqx_ds:error(_).
+fetch_global(ShardId, K) ->
+    #{db := DB} = get_schema_runtime(ShardId),
+    Result = rocksdb:get(DB, ?GLOBAL(K), _ReadOpts = []),
+    case Result of
+        {ok, _} ->
+            Result;
+        not_found ->
+            Result;
+        {error, Reason} ->
+            {error, unrecoverable, {rocksdb, Reason}}
+    end.
+
 -spec update_config(shard_id(), emqx_ds:time(), emqx_ds:create_db_opts()) ->
     ok | {error, overlaps_existing_generations}.
 update_config(ShardId, Since, Options) ->
@@ -684,13 +755,7 @@ lookup_message(ShardId, Matcher = #message_matcher{timestamp = Time}) ->
     end.
 
 -spec list_generations_with_lifetimes(shard_id()) ->
-    #{
-        gen_id() => #{
-            created_at := emqx_ds:time(),
-            since := emqx_ds:time(),
-            until := undefined | emqx_ds:time()
-        }
-    }.
+    #{gen_id() => gen_info()}.
 list_generations_with_lifetimes(ShardId) ->
     gen_server:call(?REF(ShardId), #call_list_generations_with_lifetimes{}, infinity).
 
@@ -698,6 +763,15 @@ list_generations_with_lifetimes(ShardId) ->
 drop_generation(ShardId, GenId) ->
     gen_server:call(?REF(ShardId), #call_drop_generation{gen_id = GenId}, infinity).
 
+-spec find_generation(shard_id(), current | _At :: emqx_ds:time()) ->
+    {gen_id(), gen_info()} | not_found.
+find_generation(ShardId, current) ->
+    GenId = generation_current(ShardId),
+    GenData = #{} = generation_get(ShardId, GenId),
+    {GenId, GenData};
+find_generation(ShardId, AtTime) ->
+    generation_at(ShardId, AtTime).
+
 -spec shard_info(shard_id(), status) -> running | down.
 shard_info(ShardId, status) ->
     try get_schema_runtime(ShardId) of
@@ -774,7 +848,53 @@ init({ShardId, Options}) ->
     },
     commit_metadata(S),
     ?tp(debug, ds_storage_init_state, #{shard => ShardId, s => S}),
-    {ok, S}.
+    {ok, S, {continue, clean_orphans}}.
+
+handle_continue(
+    clean_orphans,
+    S = #s{shard_id = ShardId, db = DB, cf_refs = CFRefs, schema = Schema}
+) ->
+    %% Add / drop generation are not transactional.
+    %% This means that the storage may contain "orphaned" column families, i.e.
+    %% column families that do not belong to a live generation. We need to clean
+    %% them, because an attempt to create existing column family is an error,
+    %% therefore `add_generation/2` is not idempotent. Cleaning seems to be safe:
+    %% either it's unfinished `handle_add_generation/2` meaning CFs are empty, or
+    %% it's unfinished `handle_drop_generation/2` meaning CFs was meant to be
+    %% dropped anyway.
+    CFNames = maps:fold(
+        fun
+            (?GEN_KEY(_), #{cf_names := GenCFNames}, Acc) ->
+                GenCFNames ++ Acc;
+            (_Prop, _, Acc) ->
+                Acc
+        end,
+        [],
+        Schema
+    ),
+    OrphanedCFRefs = lists:foldl(fun proplists:delete/2, CFRefs, CFNames),
+    case OrphanedCFRefs of
+        [] ->
+            {noreply, S};
+        [_ | _] ->
+            lists:foreach(
+                fun({CFName, CFHandle}) ->
+                    Result = rocksdb:drop_column_family(DB, CFHandle),
+                    ?tp(
+                        warning,
+                        ds_storage_layer_dropped_orphaned_column_family,
+                        #{
+                            shard => ShardId,
+                            orphan => CFName,
+                            result => Result,
+                            s => format_state(S)
+                        }
+                    )
+                end,
+                OrphanedCFRefs
+            ),
+            {noreply, S#s{cf_refs = CFRefs -- OrphanedCFRefs}}
+    end.
 
 format_status(Status) ->
     maps:map(
@@ -857,7 +977,7 @@ clear_all_checkpoints(ShardId) ->
     shard().
 open_shard(ShardId, DB, CFRefs, ShardSchema) ->
     %% Transform generation schemas to generation runtime data:
-    maps:map(
+    Shard = maps:map(
         fun
             (?GEN_KEY(GenId), GenSchema) ->
                 open_generation(ShardId, DB, CFRefs, GenId, GenSchema);
@@ -865,7 +985,8 @@ open_shard(ShardId, DB, CFRefs, ShardSchema) ->
                 Val
         end,
         ShardSchema
-    ).
+    ),
+    Shard#{db => DB}.
 
 -spec handle_add_generation(server_state(), emqx_ds:time()) ->
     server_state() | {error, overlaps_existing_generations}.

+ 1 - 1
apps/emqx_durable_storage/src/emqx_ds_storage_skipstream_lts.erl

@@ -414,7 +414,7 @@ lookup_message(
                 not_found ->
                     not_found;
                 {error, Reason} ->
-                    {error, unrecoverable, Reason}
+                    {error, unrecoverable, {rocksdb, Reason}}
             end;
         undefined ->
             not_found

+ 1 - 0
changes/ee/fix-14229.en.md

@@ -0,0 +1 @@
+Fixed a couple of issues in the implementation of Raft/RocksDB backend for Durable Storage that might have negatively impacted correctness and replica convergence of internal DBs used by Durable Shared Subscriptions in some cases.