Przeglądaj źródła

feat(dssubs): ensure store detects concurrent ops during commits

Andrew Mayorov 1 rok temu
rodzic
commit
ad4e867ee3

+ 50 - 21
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_store.erl

@@ -84,8 +84,6 @@
 -define(STORE_SLURP_RETRIES, 2).
 -define(STORE_SLURP_RETRY_TIMEOUT, 1000).
 
--define(STORE_IS_ROOTSET(VAR), (VAR == seqnum)).
-
 -ifdef(TEST).
 -undef(LEADER_TTL).
 -undef(LEADER_HEARTBEAT_INTERVAL).
@@ -296,10 +294,12 @@ mk_leader_topic(ID) ->
     },
     stream := #{emqx_ds:stream() => stream_state()},
     rank_progress => _RankProgress,
-    %% Internal _sequence number_ that tracks every change.
+    %% Internal _sequence numbers_ that tracks every change.
     seqnum := integer(),
     %% Mapping between complex keys and seqnums.
     seqmap := #{space_key() => _SeqNum :: integer()},
+    %% Last committed sequence number.
+    committed := integer(),
     %% Stage: uncommitted changes.
     stage := #{space_key() | var_name() => _Value}
 }.
@@ -334,6 +334,7 @@ mk_store(ID) ->
         properties => #{},
         seqnum => 0,
         seqmap => #{},
+        committed => 0,
         stage => #{}
     }.
 
@@ -363,7 +364,7 @@ slurp_store(Rootset, StreamIts0, Retries, RetryTimeout, Acc = #{id := ID}) ->
         %% concerning, because this suggests there were concurrent writes that slipped
         %% past the leadership claim guards, yet we can still make progress.
         SeqNum when SeqNum >= map_get(seqnum, Rootset) ->
-            maps:merge(Store, Rootset);
+            reset_dirty(maps:merge(Store, Rootset));
         _Mismatch when Retries > 0 ->
             ok = timer:sleep(RetryTimeout),
             slurp_store(Rootset, StreamIts, Retries - 1, RetryTimeout, Store);
@@ -458,29 +459,40 @@ create(Store) ->
     Batch = mk_store_create_batch(Store),
     case emqx_ds:store_batch(?DS_DB, Batch, #{sync => true}) of
         ok ->
-            {ok, Store#{stage := #{}}};
+            {ok, reset_dirty(Store)};
         {error, unrecoverable, {precondition_failed, _Mismatch}} ->
             exists;
         Error ->
             Error
     end.
 
--spec destroy(t()) -> ok | emqx_ds:error(_).
+-spec destroy(t()) -> ok | conflict | emqx_ds:error(_).
 destroy(Store) ->
     Batch = mk_store_delete_batch(Store),
-    emqx_ds:store_batch(?DS_DB, Batch, #{sync => true}).
+    case emqx_ds:store_batch(?DS_DB, Batch, #{sync => true}) of
+        ok ->
+            ok;
+        {error, unrecoverable, {precondition_failed, not_found}} ->
+            %% Probably was deleted concurrently.
+            ok;
+        {error, unrecoverable, {precondition_failed, #message{}}} ->
+            %% Probably was updated concurrently.
+            conflict;
+        Error ->
+            Error
+    end.
 
 %% @doc Commit staged changes to the storage.
 %% Does nothing if there are no staged changes.
 -spec commit_dirty(leader_claim(_), t()) ->
-    {ok, t()} | emqx_ds:error(_).
+    {ok, t()} | destroyed | emqx_ds:error(_).
 commit_dirty(LeaderClaim, Store = #{stage := Stage}) when map_size(Stage) > 0 ->
     Batch = mk_store_leader_batch(Store, LeaderClaim),
     case emqx_ds:store_batch(?DS_DB, Batch, #{sync => true}) of
         ok ->
-            {ok, Store#{stage := #{}}};
+            {ok, reset_dirty(Store)};
         {error, unrecoverable, {precondition_failed, Mismatch}} ->
-            {error, unrecoverable, {leadership_lost, decode_leader_msg(Mismatch)}};
+            map_commit_precondition_failure(Mismatch);
         Error ->
             Error
     end;
@@ -490,16 +502,16 @@ commit_dirty(_LeaderClaim, Store) ->
 %% @doc Commit staged changes and renew leadership at the same time.
 %% Goes to the storage even if there are no staged changes.
 -spec commit_renew(leader_claim(ID), emqx_message:timestamp(), t()) ->
-    {ok, leader_claim(ID), t()} | emqx_ds:error(_).
+    {ok, leader_claim(ID), t()} | destroyed | emqx_ds:error(_).
 commit_renew(LeaderClaim, TS, Store) ->
     case renew_claim(LeaderClaim, TS) of
         RenewedClaim when RenewedClaim =/= false ->
             Batch = mk_store_leader_batch(Store, LeaderClaim, RenewedClaim),
             case emqx_ds:store_batch(?DS_DB, Batch, #{sync => true}) of
                 ok ->
-                    {ok, RenewedClaim, Store#{stage := #{}}};
+                    {ok, RenewedClaim, reset_dirty(Store)};
                 {error, unrecoverable, {precondition_failed, Mismatch}} ->
-                    {error, unrecoverable, {leadership_lost, decode_leader_msg(Mismatch)}};
+                    map_commit_precondition_failure(Mismatch);
                 Error ->
                     Error
             end;
@@ -507,16 +519,33 @@ commit_renew(LeaderClaim, TS, Store) ->
             {error, unrecoverable, leader_claim_outdated}
     end.
 
+map_commit_precondition_failure(not_found) ->
+    %% Assuming store was destroyed concurrently.
+    destroyed;
+map_commit_precondition_failure(Msg = #message{topic = Topic}) ->
+    case emqx_topic:tokens(Topic) of
+        [?LEADER_TOPIC_PREFIX | _] ->
+            {error, unrecoverable, {leadership_lost, decode_leader_msg(Msg)}};
+        [?STORE_TOPIC_PREFIX | _] ->
+            Rootset = open_root_message(Msg),
+            {error, unrecoverable, {concurrent_update, Rootset}}
+    end.
+
+reset_dirty(Stage = #{seqnum := SeqNum}) ->
+    Stage#{stage := #{}, committed := SeqNum}.
+
 mk_store_leader_batch(Store = #{id := ID}, LeaderClaim) ->
+    RootPrecondition = {if_exists, mk_store_root_matcher(Store)},
+    LeaderPrecondition = mk_leader_precondition(ID, LeaderClaim),
     #dsbatch{
-        preconditions = [mk_leader_precondition(ID, LeaderClaim)],
+        preconditions = [LeaderPrecondition, RootPrecondition],
         operations = mk_store_operations(Store)
     }.
 
 mk_store_leader_batch(Store = #{id := ID}, ExistingClaim, RenewedClaim) ->
-    #dsbatch{
-        preconditions = [mk_leader_precondition(ID, ExistingClaim)],
-        operations = [encode_leader_claim(ID, RenewedClaim) | mk_store_operations(Store)]
+    Batch = #dsbatch{operations = Operations} = mk_store_leader_batch(Store, ExistingClaim),
+    Batch#dsbatch{
+        operations = [encode_leader_claim(ID, RenewedClaim) | Operations]
     }.
 
 mk_store_create_batch(Store = #{id := ID}) ->
@@ -565,8 +594,8 @@ mk_store_operations(Store = #{id := ID, stage := Stage, seqmap := SeqMap}) ->
         Stage
     ).
 
-mk_store_root(Store = #{id := ID}) ->
-    Payload = maps:filter(fun(V, _) -> ?STORE_IS_ROOTSET(V) end, Store),
+mk_store_root(#{id := ID, seqnum := SeqNum}) ->
+    Payload = #{seqnum => SeqNum},
     #message{
         id = <<>>,
         qos = 0,
@@ -576,8 +605,8 @@ mk_store_root(Store = #{id := ID}) ->
         timestamp = 0
     }.
 
-mk_store_root_matcher(Store = #{id := ID}) ->
-    Payload = maps:filter(fun(V, _) -> ?STORE_IS_ROOTSET(V) end, Store),
+mk_store_root_matcher(#{id := ID, committed := Committed}) ->
+    Payload = #{seqnum => Committed},
     #message_matcher{
         from = ID,
         topic = mk_store_root_topic(ID),