Просмотр исходного кода

fix(dsrepl): ensure preconditions are idemotent under log replays

When log replay is involved, preconditions should work as if they
observe the storage state at the specific point corresponding to
the Raft index, without the effects of any subsequent log entires
that may actually have been already applied to the storage state
before.

Because it's impossible with how we currently manage storage state
and snapshots, emulate that through keeping track of the latest Raft
index in the storage itself, and skip any (preconditions-involving)
batches that seem to be already applied by reading the latest Raft
index record on apply.
Andrew Mayorov 1 год назад
Родитель
Сommit
ba764a3c86

+ 153 - 19
apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl

@@ -14,6 +14,7 @@
     open_db/2,
     close_db/1,
     add_generation/1,
+    add_generation/2,
     update_db_config/2,
     list_generations_with_lifetimes/1,
     drop_generation/2,
@@ -212,17 +213,22 @@ close_db(DB) ->
 
 -spec add_generation(emqx_ds:db()) -> ok | {error, _}.
 add_generation(DB) ->
+    add_generation(DB, emqx_ds:timestamp_us()).
+
+-spec add_generation(emqx_ds:db(), emqx_ds:time()) -> ok | {error, _}.
+add_generation(DB, Since) ->
     foreach_shard(
         DB,
-        fun(Shard) -> ok = ra_add_generation(DB, Shard) end
+        fun(Shard) -> ok = ra_add_generation(DB, Shard, Since) end
     ).
 
 -spec update_db_config(emqx_ds:db(), builtin_db_opts()) -> ok | {error, _}.
 update_db_config(DB, CreateOpts) ->
     Opts = #{} = emqx_ds_replication_layer_meta:update_db_config(DB, CreateOpts),
+    Since = emqx_ds:timestamp_us(),
     foreach_shard(
         DB,
-        fun(Shard) -> ok = ra_update_config(DB, Shard, Opts) end
+        fun(Shard) -> ok = ra_update_config(DB, Shard, Opts, Since) end
     ).
 
 -spec list_generations_with_lifetimes(emqx_ds:db()) ->
@@ -780,18 +786,18 @@ ra_store_batch(DB, Shard, Batch) ->
             {error, recoverable, Reason}
     end.
 
-ra_add_generation(DB, Shard) ->
+ra_add_generation(DB, Shard, Since) ->
     Command = #{
         ?tag => add_generation,
-        ?since => emqx_ds:timestamp_us()
+        ?since => Since
     },
     ra_command(DB, Shard, Command, 10).
 
-ra_update_config(DB, Shard, Opts) ->
+ra_update_config(DB, Shard, Opts, Since) ->
     Command = #{
         ?tag => update_config,
         ?config => Opts,
-        ?since => emqx_ds:timestamp_us()
+        ?since => Since
     },
     ra_command(DB, Shard, Command, 10).
 
@@ -910,10 +916,44 @@ ra_list_generations_with_lifetimes(DB, Shard) ->
 ra_drop_shard(DB, Shard) ->
     ra:delete_cluster(emqx_ds_replication_layer_shard:shard_servers(DB, Shard), ?RA_TIMEOUT).
 
+%% Ra Machine implementation
+%%
+%% This code decides how successfully replicated and committed log entries (e.g.
+%% commands) are applied to the shard storage state. This state is actually comprised
+%% logically of 2 parts:
+%% 1. RocksDB database managed through `emqx_ds_storage_layer`.
+%% 2. Machine state (`ra_state()`) that holds very minimal state needed to ensure
+%%    higher-level semantics, most importantly strictly monotonic quasi-wallclock
+%%    timestamp used to assign unique message timestamps to fulfill "append-only"
+%%    guarantees.
+%%
+%% There are few subtleties in how storage state is persisted and recovered.
+%% When the shard recovers from a shutdown or crash, this is what usually happens:
+%% 1. Shard storage layer starts up the RocksDB database.
+%% 2. Ra recovers the Raft log.
+%% 3. Ra recovers the latest machine snapshot (`ra_state()`), taken at some point
+%%    in time (`RaftIdx`).
+%% 4. Ra applies existing Raft log entries starting from `RaftIdx`.
 %%
+%% While most of the time storage layer state, machine snapshot and log entries are
+%% consistent with each other, there are situations when they are not. Namely:
+%%  * RocksDB decides to flush memtables to disk by itself, which is unexpected but
+%%    possible.
+%%  * Lagging replica accepts a storage snapshot sourced from a RocksDB checkpoint,
+%%    and RocksDB database is always implicitly flushed before checkpointing.
+%% In both of those cases, the Raft log would contain entries that were already
+%% applied from the point of view of the storage layer, and we must anticipate that.
+%%
+%% The process running Ra machine also keeps auxiliary ephemeral state in the process
+%% dictionary, see `?pd_ra_*` macrodefs for details.
 
+%% Index of the last yet unreleased Ra log entry.
 -define(pd_ra_idx_need_release, '$emqx_ds_raft_idx_need_release').
+
+%% Approximate number of bytes occupied by yet unreleased Ra log entries.
 -define(pd_ra_bytes_need_release, '$emqx_ds_raft_bytes_need_release').
+
+%% Cached value of the `append_only` DS DB configuration setting.
 -define(pd_ra_force_monotonic, '$emqx_ds_raft_force_monotonic').
 
 -spec init(_Args :: map()) -> ra_state().
@@ -923,7 +963,7 @@ init(#{db := DB, shard := Shard}) ->
 -spec apply(ra_machine:command_meta_data(), ra_command(), ra_state()) ->
     {ra_state(), _Reply, _Effects}.
 apply(
-    RaftMeta,
+    RaftMeta = #{index := RaftIdx},
     Command = #{
         ?tag := ?BATCH,
         ?batch_operations := OperationsIn
@@ -931,32 +971,69 @@ apply(
     #{db_shard := DBShard = {DB, Shard}, latest := Latest0} = State0
 ) ->
     ?tp(ds_ra_apply_batch, #{db => DB, shard => Shard, batch => OperationsIn, latest => Latest0}),
-    Preconditions = maps:get(?batch_preconditions, Command, []),
     {Stats, Latest, Operations} = assign_timestamps(DB, Latest0, OperationsIn),
-    DispatchF = fun(Events) ->
-        emqx_ds_beamformer:shard_event({DB, Shard}, Events)
-    end,
-    %% FIXME
-    case emqx_ds_precondition:verify(emqx_ds_storage_layer, DBShard, Preconditions) of
+    Preconditions = maps:get(?batch_preconditions, Command, []),
+    Admission =
+        case Preconditions of
+            [] ->
+                %% No preconditions.
+                true;
+            _ ->
+                %% Since preconditions are part of the Ra log, we need to be sure they
+                %% are applied perfectly idempotently, even when Ra log entries are
+                %% replayed on top of a more recent storage state that already had them
+                %% evaluated and applied before.
+                case read_storage_raidx(DBShard) of
+                    {ok, SRI} when RaftIdx > SRI ->
+                        emqx_ds_precondition:verify(emqx_ds_storage_layer, DBShard, Preconditions);
+                    {ok, SRI} when SRI >= RaftIdx ->
+                        %% This batch looks outdated relative to the storage layer state.
+                        false;
+                    {error, _, _} = Error ->
+                        Error
+                end
+        end,
+    case Admission of
+        true ->
+            %% Plain batch, no preconditions.
+            Result = store_batch_nondurable(DBShard, Operations),
+            State = State0#{latest := Latest},
+            set_ts(DBShard, Latest),
+            Effects = try_release_log(Stats, RaftMeta, State);
         ok ->
-            Result = emqx_ds_storage_layer:store_batch(
-                DBShard, Operations, #{durable => false}, DispatchF
-            ),
+            %% Preconditions succeeded, need to persist `Latest` in the storage layer.
+            UpdateOps = mk_update_storage_raidx(DBShard, Latest, RaftIdx),
+            Result = store_batch_nondurable(DBShard, UpdateOps ++ Operations),
             State = State0#{latest := Latest},
             set_ts(DBShard, Latest),
             Effects = try_release_log(Stats, RaftMeta, State);
+        Result = false ->
+            %% There are preconditions, but the batch looks outdated. Skip the batch.
+            %% This is log replay, reply with `false`, noone expects the reply anyway.
+            State = State0#{latest := Latest},
+            set_ts(DBShard, Latest),
+            Effects = [];
         PreconditionFailed = {precondition_failed, _} ->
+            %% Preconditions failed. Skip the batch.
             Result = {error, unrecoverable, PreconditionFailed},
             State = State0,
             Effects = [];
-        Result ->
+        Result = {error, unrecoverable, Reason} ->
+            ?tp(error, "emqx_ds_replication_apply_batch_failed", #{
+                db => DB,
+                shard => Shard,
+                reason => Reason,
+                details =>
+                    "Unrecoverable error storing committed batch. Replicas may diverge. "
+                    "Consider rebuilding this shard replica from scratch."
+            }),
             State = State0,
             Effects = []
     end,
     Effects =/= [] andalso ?tp(ds_ra_effects, #{effects => Effects, meta => RaftMeta}),
     {State, Result, Effects};
 apply(
-    RaftMeta,
+    RaftMeta = #{index := RaftIdx},
     #{?tag := add_generation, ?since := Since},
     #{db_shard := DBShard, latest := Latest0} = State0
 ) ->
@@ -970,13 +1047,15 @@ apply(
     ),
     {Timestamp, Latest} = ensure_monotonic_timestamp(Since, Latest0),
     Result = emqx_ds_storage_layer:add_generation(DBShard, Timestamp),
+    UpdateOps = mk_update_storage_raidx(DBShard, Latest, RaftIdx),
+    _Ok = emqx_ds_storage_layer:store_batch(DBShard, UpdateOps, #{durable => false}),
     State = State0#{latest := Latest},
     set_ts(DBShard, Latest),
     Effects = release_log(RaftMeta, State),
     Effects =/= [] andalso ?tp(ds_ra_effects, #{effects => Effects, meta => RaftMeta}),
     {State, Result, Effects};
 apply(
-    RaftMeta,
+    RaftMeta = #{index := RaftIdx},
     #{?tag := update_config, ?since := Since, ?config := Opts},
     #{db_shard := DBShard, latest := Latest0} = State0
 ) ->
@@ -991,6 +1070,8 @@ apply(
     ),
     {Timestamp, Latest} = ensure_monotonic_timestamp(Since, Latest0),
     Result = emqx_ds_storage_layer:update_config(DBShard, Timestamp, Opts),
+    UpdateOps = mk_update_storage_raidx(DBShard, Latest, RaftIdx),
+    _Ok = emqx_ds_storage_layer:store_batch(DBShard, UpdateOps, #{durable => false}),
     State = State0#{latest := Latest},
     Effects = release_log(RaftMeta, State),
     Effects =/= [] andalso ?tp(ds_ra_effects, #{effects => Effects, meta => RaftMeta}),
@@ -1042,6 +1123,59 @@ force_monotonic_timestamps(DB) ->
     end,
     Flag.
 
+store_batch_nondurable(DBShard = {DB, Shard}, Operations) ->
+    DispatchF = fun(Events) ->
+        emqx_ds_beamformer:shard_event({DB, Shard}, Events)
+    end,
+    emqx_ds_storage_layer:store_batch(DBShard, Operations, #{durable => false}, DispatchF).
+
+%% Latest Raft index tracking
+%%
+%% Latest RaIdx is kept in a special, invisible from outside topic, at the timestamp
+%% matching the start of the latest storage generation. Each update goes to the same
+%% spot in the RocksDB, but this should not be a problem: writes are non-durable and
+%% issued only when preconditions are involved.
+
+-define(DSREPL_TOPIC(ENTRY), <<"$", 0:8, "/", (ENTRY):8>>).
+-define(DSREPL_LATEST, 1).
+
+read_storage_raidx(DBShard) ->
+    {_Id, #{since := Since}} = emqx_ds_storage_layer:find_generation(DBShard, current),
+    Matcher = #message_matcher{
+        from = <<>>,
+        topic = ?DSREPL_TOPIC(?DSREPL_LATEST),
+        timestamp = Since,
+        payload = '_'
+    },
+    case emqx_ds_storage_layer:lookup_message(DBShard, Matcher) of
+        #message{payload = Payload} when byte_size(Payload) =< 8 ->
+            {ok, binary:decode_unsigned(Payload)};
+        not_found ->
+            {ok, 0};
+        #message{payload = Payload} ->
+            {error, unrecoverable, {unexpected, Payload}};
+        {error, unrecoverable, _Reason} = Error ->
+            Error
+    end.
+
+mk_update_storage_raidx(DBShard, Latest, RaIdx) ->
+    case emqx_ds_storage_layer:find_generation(DBShard, Latest) of
+        {_Id, #{since := Since}} ->
+            [
+                {Since, #message{
+                    id = <<>>,
+                    from = <<>>,
+                    topic = ?DSREPL_TOPIC(?DSREPL_LATEST),
+                    timestamp = Since,
+                    payload = binary:encode_unsigned(RaIdx)
+                }}
+            ];
+        not_found ->
+            []
+    end.
+
+%% Log truncation
+
 try_release_log({_N, BatchSize}, RaftMeta = #{index := CurrentIdx}, State) ->
     %% NOTE
     %% Because cursor release means storage flush (see

+ 135 - 1
apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl

@@ -19,6 +19,7 @@
 -compile(nowarn_export_all).
 
 -include("../../emqx/include/emqx.hrl").
+-include_lib("emqx_durable_storage/include/emqx_ds.hrl").
 -include_lib("common_test/include/ct.hrl").
 -include_lib("stdlib/include/assert.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
@@ -31,7 +32,7 @@
 
 opts(Config, Overrides) ->
     Layout = ?config(layout, Config),
-    maps:merge(
+    emqx_utils_maps:deep_merge(
         #{
             backend => builtin_raft,
             storage => Layout,
@@ -178,6 +179,139 @@ t_replication_transfers_snapshots(Config) ->
         []
     ).
 
+t_preconditions_idempotent(init, Config) ->
+    Apps = [appspec(ra), appspec(emqx_durable_storage), appspec(emqx_ds_builtin_raft)],
+    Specs = emqx_cth_cluster:mk_nodespecs(
+        [
+            {t_preconditions_idempotent1, #{apps => Apps}},
+            {t_preconditions_idempotent2, #{apps => Apps}}
+        ],
+        #{work_dir => ?config(work_dir, Config)}
+    ),
+    Nodes = emqx_cth_cluster:start(Specs),
+    [{nodes, Nodes}, {specs, Specs} | Config];
+t_preconditions_idempotent('end', Config) ->
+    ok = emqx_cth_cluster:stop(?config(nodes, Config)).
+
+t_preconditions_idempotent(Config) ->
+    C1 = <<"C1">>,
+    Topic1 = <<"t/foo">>,
+    Topic2 = <<"t/bar/xyz">>,
+    Messages = [
+        message(C1, Topic1, <<"M0">>, 0),
+        message(C1, Topic2, <<"M0">>, 0),
+        message(C1, Topic1, <<"M1">>, 1),
+        message(C1, Topic2, <<"M1">>, 1),
+        message(C1, Topic1, <<"M2">>, 2),
+        message(C1, Topic2, <<"M2">>, 2),
+        message(C1, Topic1, <<"M100">>, 100)
+    ],
+    Batch1 = [
+        message(C1, Topic2, <<"M200">>, 200),
+        message(C1, Topic1, <<"M300">>, 300)
+    ],
+    Since1 = 350,
+    Since2 = 600,
+    Batch2 = #dsbatch{
+        preconditions = [
+            {if_exists, #message_matcher{from = C1, topic = Topic2, timestamp = 400, payload = '_'}}
+        ],
+        operations = [
+            message(C1, Topic1, <<"M5">>, 500)
+        ]
+    },
+    Batch3 = #dsbatch{
+        preconditions = [
+            {if_exists, #message_matcher{from = C1, topic = Topic1, timestamp = 100, payload = '_'}}
+        ],
+        operations = [
+            message(C1, Topic2, <<"M4">>, 400)
+        ]
+    },
+
+    Nodes = [N1, N2] = ?config(nodes, Config),
+    _Specs = [NS1, _] = ?config(specs, Config),
+    Opts = opts(Config, #{
+        n_shards => 1,
+        n_sites => 2,
+        replication_factor => 3,
+        append_only => false,
+        replication_options => #{
+            %% Make sure snapshots are taken eagerly.
+            snapshot_interval => 4
+        }
+    }),
+    ?check_trace(
+        #{timetrap => 30_000},
+        begin
+            assert_db_open(Nodes, ?DB, Opts),
+
+            %% Store several messages.
+            [ok = ?ON(N2, emqx_ds:store_batch(?DB, [M], #{sync => true})) || M <- Messages],
+            ?assertEqual(
+                ok,
+                ?ON(N2, emqx_ds:store_batch(?DB, Batch1, #{sync => true}))
+            ),
+
+            %% Add a generation. This will cause the storage layer to flush.
+            ok = ?ON(N2, emqx_ds_replication_layer:add_generation(?DB, Since1)),
+
+            %% Store batches with preconditions.
+            ?assertMatch(
+                %% No `{Topic2, _TS = 400}` message yet, should fail.
+                {error, _, {precondition_failed, _}},
+                ?ON(N2, emqx_ds:store_batch(?DB, Batch2, #{sync => true}))
+            ),
+            ?assertEqual(
+                %% Only now `{Topic2, _TS = 400}` should be stored.
+                ok,
+                ?ON(N2, emqx_ds:store_batch(?DB, Batch3, #{sync => true}))
+            ),
+
+            %% Restart N1 and wait until it is ready.
+            [N1] = emqx_cth_cluster:restart(NS1),
+            RestartedAt1 = erlang:monotonic_time(millisecond),
+            ok = ?ON(N1, emqx_ds:open_db(?DB, Opts)),
+            SinceRestarted1 = erlang:monotonic_time(millisecond) - RestartedAt1,
+            wait_db_bootstrapped([N1], ?DB, infinity, SinceRestarted1),
+
+            %% Both replicas should still contain the same set of messages.
+            [N1Msgs1, N2Msgs1] = ?ON(
+                Nodes,
+                emqx_ds_test_helpers:storage_consume({?DB, <<"0">>}, ['#'])
+            ),
+            emqx_ds_test_helpers:assert_same_set(N1Msgs1, N2Msgs1),
+
+            %% Add one more generation, idempotency should still hold if it's
+            %% the last log entry.
+            ok = ?ON(N2, emqx_ds_replication_layer:add_generation(?DB, Since2)),
+
+            %% Restart N1 and wait until it is ready.
+            [N1] = emqx_cth_cluster:restart(NS1),
+            RestartedAt2 = erlang:monotonic_time(millisecond),
+            ok = ?ON(N1, emqx_ds:open_db(?DB, Opts)),
+            SinceRestarted2 = erlang:monotonic_time(millisecond) - RestartedAt2,
+            wait_db_bootstrapped([N1], ?DB, infinity, SinceRestarted2),
+
+            %% But both replicas should still contain the same set of messages.
+            [N1Msgs2, N2Msgs2] = ?ON(
+                Nodes,
+                emqx_ds_test_helpers:storage_consume({?DB, <<"0">>}, ['#'])
+            ),
+            emqx_ds_test_helpers:assert_same_set(N1Msgs2, N2Msgs2)
+        end,
+        fun(Trace) ->
+            %% Expect Raft log entries following `add_generation/2` to be reapplied
+            %% twice, once per each restart.
+            Events = ?of_kind(ds_ra_apply_batch, ?of_node(N1, Trace)),
+            ?assertMatch(
+                % Batch2, Batch3, Batch2, Batch3, Batch2, Batch3
+                [_, _, _, _, _, _],
+                [E || E = #{latest := L} <- Events, L > Since1]
+            )
+        end
+    ).
+
 t_rebalance(init, Config) ->
     Apps = [appspec(ra), appspec(emqx_durable_storage), appspec(emqx_ds_builtin_raft)],
     Nodes = emqx_cth_cluster:start(

+ 17 - 7
apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl

@@ -52,6 +52,7 @@
     add_generation/2,
     list_generations_with_lifetimes/1,
     drop_generation/2,
+    find_generation/2,
 
     %% Snapshotting
     flush/1,
@@ -128,6 +129,12 @@
 -type cf_refs() :: [cf_ref()].
 
 -type gen_id() :: 0..16#ffff.
+-type gen_info() :: #{
+    created_at := emqx_ds:time(),
+    since := emqx_ds:time(),
+    until := undefined | emqx_ds:time(),
+    _ => _
+}.
 
 -type batch() :: [
     {emqx_ds:time(), emqx_types:message()}
@@ -684,13 +691,7 @@ lookup_message(ShardId, Matcher = #message_matcher{timestamp = Time}) ->
     end.
 
 -spec list_generations_with_lifetimes(shard_id()) ->
-    #{
-        gen_id() => #{
-            created_at := emqx_ds:time(),
-            since := emqx_ds:time(),
-            until := undefined | emqx_ds:time()
-        }
-    }.
+    #{gen_id() => gen_info()}.
 list_generations_with_lifetimes(ShardId) ->
     gen_server:call(?REF(ShardId), #call_list_generations_with_lifetimes{}, infinity).
 
@@ -698,6 +699,15 @@ list_generations_with_lifetimes(ShardId) ->
 drop_generation(ShardId, GenId) ->
     gen_server:call(?REF(ShardId), #call_drop_generation{gen_id = GenId}, infinity).
 
+-spec find_generation(shard_id(), current | _At :: emqx_ds:time()) ->
+    {gen_id(), gen_info()} | not_found.
+find_generation(ShardId, current) ->
+    GenId = generation_current(ShardId),
+    GenData = #{} = generation_get(ShardId, GenId),
+    {GenId, GenData};
+find_generation(ShardId, AtTime) ->
+    generation_at(ShardId, AtTime).
+
 -spec shard_info(shard_id(), status) -> running | down.
 shard_info(ShardId, status) ->
     try get_schema_runtime(ShardId) of

+ 1 - 1
apps/emqx_durable_storage/src/emqx_ds_storage_skipstream_lts.erl

@@ -414,7 +414,7 @@ lookup_message(
                 not_found ->
                     not_found;
                 {error, Reason} ->
-                    {error, unrecoverable, Reason}
+                    {error, unrecoverable, {rocksdb, Reason}}
             end;
         undefined ->
             not_found