Просмотр исходного кода

fix(dsraft): always persist raftidx on batches with preconditions

Before this commit, failed preconditions might have become positive on
replay, e.g. when preconditionless batches later in the log made
conditions trueish.
Andrew Mayorov 1 год назад
Родитель
Сommit
ce40e36627

+ 3 - 2
apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl

@@ -1002,7 +1002,7 @@ apply(
             Result = store_batch_nondurable(DBShard, Operations),
             Result = store_batch_nondurable(DBShard, Operations),
             Effects = try_release_log(Stats, RaftMeta, State);
             Effects = try_release_log(Stats, RaftMeta, State);
         ok ->
         ok ->
-            %% Preconditions succeeded, need to persist `Latest` in the storage layer.
+            %% Preconditions succeeded, need to persist `RaftIdx` in the storage layer.
             Result = store_batch_nondurable(DBShard, Operations),
             Result = store_batch_nondurable(DBShard, Operations),
             Result == ok andalso update_storage_raidx(DBShard, RaftIdx),
             Result == ok andalso update_storage_raidx(DBShard, RaftIdx),
             Effects = try_release_log(Stats, RaftMeta, State);
             Effects = try_release_log(Stats, RaftMeta, State);
@@ -1011,8 +1011,9 @@ apply(
             %% This is log replay, reply with `false`, noone expects the reply anyway.
             %% This is log replay, reply with `false`, noone expects the reply anyway.
             Effects = [];
             Effects = [];
         PreconditionFailed = {precondition_failed, _} ->
         PreconditionFailed = {precondition_failed, _} ->
-            %% Preconditions failed. Skip the batch.
+            %% Preconditions failed. Skip the batch, persist `RaftIdx` in the storage layer.
             Result = {error, unrecoverable, PreconditionFailed},
             Result = {error, unrecoverable, PreconditionFailed},
+            update_storage_raidx(DBShard, RaftIdx),
             Effects = [];
             Effects = [];
         Result = {error, unrecoverable, Reason} ->
         Result = {error, unrecoverable, Reason} ->
             ?tp(error, "emqx_ds_replication_apply_batch_failed", #{
             ?tp(error, "emqx_ds_replication_apply_batch_failed", #{

+ 61 - 43
apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl

@@ -197,37 +197,6 @@ t_preconditions_idempotent(Config) ->
     C1 = <<"C1">>,
     C1 = <<"C1">>,
     Topic1 = <<"t/foo">>,
     Topic1 = <<"t/foo">>,
     Topic2 = <<"t/bar/xyz">>,
     Topic2 = <<"t/bar/xyz">>,
-    Messages = [
-        message(C1, Topic1, <<"M0">>, 0),
-        message(C1, Topic2, <<"M0">>, 0),
-        message(C1, Topic1, <<"M1">>, 1),
-        message(C1, Topic2, <<"M1">>, 1),
-        message(C1, Topic1, <<"M2">>, 2),
-        message(C1, Topic2, <<"M2">>, 2),
-        message(C1, Topic1, <<"M100">>, 100)
-    ],
-    Batch1 = [
-        message(C1, Topic2, <<"M200">>, 200),
-        message(C1, Topic1, <<"M300">>, 300)
-    ],
-    Since1 = 350,
-    Since2 = 600,
-    Batch2 = #dsbatch{
-        preconditions = [
-            {if_exists, #message_matcher{from = C1, topic = Topic2, timestamp = 400, payload = '_'}}
-        ],
-        operations = [
-            message(C1, Topic1, <<"M5">>, 500)
-        ]
-    },
-    Batch3 = #dsbatch{
-        preconditions = [
-            {if_exists, #message_matcher{from = C1, topic = Topic1, timestamp = 100, payload = '_'}}
-        ],
-        operations = [
-            message(C1, Topic2, <<"M4">>, 400)
-        ]
-    },
 
 
     Nodes = [N1, N2] = ?config(nodes, Config),
     Nodes = [N1, N2] = ?config(nodes, Config),
     _Specs = [NS1, _] = ?config(specs, Config),
     _Specs = [NS1, _] = ?config(specs, Config),
@@ -238,7 +207,7 @@ t_preconditions_idempotent(Config) ->
         append_only => false,
         append_only => false,
         replication_options => #{
         replication_options => #{
             %% Make sure snapshots are taken eagerly.
             %% Make sure snapshots are taken eagerly.
-            snapshot_interval => 4
+            snapshot_interval => 6
         }
         }
     }),
     }),
     ?check_trace(
     ?check_trace(
@@ -247,25 +216,46 @@ t_preconditions_idempotent(Config) ->
             assert_db_open(Nodes, ?DB, Opts),
             assert_db_open(Nodes, ?DB, Opts),
 
 
             %% Store several messages.
             %% Store several messages.
+            Messages = [
+                message(C1, Topic1, <<"T1/0">>, 0),
+                message(C1, Topic2, <<"T2/0">>, 0),
+                message(C1, Topic1, <<"T1/1">>, 1),
+                message(C1, Topic2, <<"T2/2">>, 1),
+                message(C1, Topic1, <<"T1/2">>, 2),
+                message(C1, Topic2, <<"T2/2">>, 2),
+                message(C1, Topic1, <<"T1/100">>, 100)
+            ],
             [ok = ?ON(N2, emqx_ds:store_batch(?DB, [M], #{sync => true})) || M <- Messages],
             [ok = ?ON(N2, emqx_ds:store_batch(?DB, [M], #{sync => true})) || M <- Messages],
-            ?assertEqual(
-                ok,
-                ?ON(N2, emqx_ds:store_batch(?DB, Batch1, #{sync => true}))
-            ),
 
 
             %% Add a generation. This will cause the storage layer to flush.
             %% Add a generation. This will cause the storage layer to flush.
+            Since1 = 300,
             ok = ?ON(N2, emqx_ds_replication_layer:add_generation(?DB, Since1)),
             ok = ?ON(N2, emqx_ds_replication_layer:add_generation(?DB, Since1)),
 
 
             %% Store batches with preconditions.
             %% Store batches with preconditions.
+            Batch1 = #dsbatch{
+                preconditions = [
+                    %% Appears later, as part of `Batch2`.
+                    {if_exists, #message_matcher{
+                        from = C1, topic = Topic1, timestamp = 400, payload = '_'
+                    }}
+                ],
+                operations = [
+                    message(C1, Topic1, <<"Should not be here">>, 500)
+                ]
+            },
             ?assertMatch(
             ?assertMatch(
-                %% No `{Topic2, _TS = 400}` message yet, should fail.
+                %% No `{Topic1, _TS = 400}` message yet, should fail.
                 {error, _, {precondition_failed, _}},
                 {error, _, {precondition_failed, _}},
-                ?ON(N2, emqx_ds:store_batch(?DB, Batch2, #{sync => true}))
+                ?ON(N2, emqx_ds:store_batch(?DB, Batch1, #{sync => true}))
             ),
             ),
+            Batch2 = [
+                message(C1, Topic1, <<"T1/400">>, 400),
+                message(C1, Topic2, <<"T2/400">>, 400)
+            ],
             ?assertEqual(
             ?assertEqual(
-                %% Only now `{Topic2, _TS = 400}` should be stored.
+                %% Only now `{Topic1, _TS = 400}` should be stored.
                 ok,
                 ok,
-                ?ON(N2, emqx_ds:store_batch(?DB, Batch3, #{sync => true}))
+                ?ON(N2, emqx_ds:store_batch(?DB, Batch2, #{sync => true}))
             ),
             ),
 
 
             %% Restart N1 and wait until it is ready.
             %% Restart N1 and wait until it is ready.
@@ -282,8 +272,36 @@ t_preconditions_idempotent(Config) ->
             ),
             ),
             emqx_ds_test_helpers:assert_same_set(N1Msgs1, N2Msgs1),
             emqx_ds_test_helpers:assert_same_set(N1Msgs1, N2Msgs1),
 
 
+            Batch3 = #dsbatch{
+                preconditions = [
+                    %% Exists at this point.
+                    {unless_exists, #message_matcher{
+                        from = C1, topic = Topic1, timestamp = 400, payload = '_'
+                    }}
+                ],
+                operations = [
+                    message(C1, Topic2, <<"Should not be here">>, 500)
+                ]
+            },
+            ?assertMatch(
+                %% There is `{Topic1, _TS = 400}` message yet, should fail.
+                {error, _, {precondition_failed, _}},
+                ?ON(N2, emqx_ds:store_batch(?DB, Batch3, #{sync => true}))
+            ),
+            Batch4 = [
+                {delete, #message_matcher{
+                    from = C1, topic = Topic1, timestamp = 400, payload = '_'
+                }}
+            ],
+            ?assertEqual(
+                %% Only now `{Topic1, _TS = 400}` should be deleted.
+                ok,
+                ?ON(N2, emqx_ds:store_batch(?DB, Batch4, #{sync => true}))
+            ),
+
             %% Add one more generation, idempotency should still hold if it's
             %% Add one more generation, idempotency should still hold if it's
             %% the last log entry.
             %% the last log entry.
+            Since2 = 600,
             ok = ?ON(N2, emqx_ds_replication_layer:add_generation(?DB, Since2)),
             ok = ?ON(N2, emqx_ds_replication_layer:add_generation(?DB, Since2)),
 
 
             %% Restart N1 and wait until it is ready.
             %% Restart N1 and wait until it is ready.
@@ -305,9 +323,9 @@ t_preconditions_idempotent(Config) ->
             %% twice, once per each restart.
             %% twice, once per each restart.
             Events = ?of_kind(ds_ra_apply_batch, ?of_node(N1, Trace)),
             Events = ?of_kind(ds_ra_apply_batch, ?of_node(N1, Trace)),
             ?assertMatch(
             ?assertMatch(
-                % Batch2, Batch3, Batch2, Batch3, Batch2, Batch3
-                [_, _, _, _, _, _],
-                [E || E = #{latest := L} <- Events, L > Since1]
+                %% Batch1, Batch2, Batch1, Batch2, Batch3, Batch4, Batch1, Batch2, Batch3, Batch4
+                [_, _, _, _, _, _, _, _, _, _],
+                [E || E = #{latest := L} <- Events, L > (_Since1 = 300)]
             )
             )
         end
         end
     ).
     ).