Просмотр исходного кода

feat(dssubs): make opening leader store consistent

Accounting for eventually consistent nature of `emqx_ds:get_streams/3`
and `emqx_ds:next/3`.
Andrew Mayorov 1 год назад
Родитель
Сommit
ab7f59fb33
1 измененных файлов с 178 добавлено и 79 удалено
  1. 178 79
      apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader_store.erl

+ 178 - 79
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader_store.erl

@@ -65,9 +65,16 @@
 -define(STORE_TOPIC_PREFIX, <<"$s">>).
 
 -define(STORE_SK(SPACE, KEY), [SPACE | KEY]).
+-define(STORE_STAGE_ENTRY(SEQNUM, VALUE), {SEQNUM, VALUE}).
+-define(STORE_TOMBSTONE, '$tombstone').
 -define(STORE_PAYLOAD(ID, VALUE), [ID, VALUE]).
+-define(STORE_HEADER_CHANGESEQNUM, '$store.seqnum').
+
 -define(STORE_BATCH_SIZE, 500).
--define(STORE_TOMBSTONE, '$tombstone').
+-define(STORE_SLURP_RETRIES, 2).
+-define(STORE_SLURP_RETRY_TIMEOUT, 1000).
+
+-define(STORE_IS_ROOTSET(VAR), (VAR == seqnum)).
 
 -ifdef(TEST).
 -undef(LEADER_TTL).
@@ -217,8 +224,8 @@ mk_leader_topic(GroupName) ->
 
 %%
 
--type space_name() :: stream | sequence.
--type var_name() :: start_time | rank_progress | seqnum.
+-type space_name() :: stream.
+-type var_name() :: start_time | rank_progress.
 -type space_key() :: nonempty_improper_list(space_name(), _Key).
 
 %% NOTE
@@ -237,12 +244,13 @@ mk_leader_topic(GroupName) ->
     stream := #{emqx_ds:stream() => stream_state()},
     start_time => _SubsriptionStartTime :: emqx_message:timestamp(),
     rank_progress => _RankProgress,
-    %% Internal _sequence number_ variable.
-    seqnum => integer(),
+    %% Internal _sequence number_ that tracks every change.
+    seqnum := integer(),
     %% Mapping between complex keys and seqnums.
     seqmap := #{space_key() => _SeqNum :: integer()},
     %% Stage: uncommitted changes.
-    stage := #{space_key() | var_name() => _Value}
+    stage := #{space_key() | var_name() => _Value},
+    dirty => true
 }.
 
 -type stream_state() :: #{
@@ -252,7 +260,8 @@ mk_leader_topic(GroupName) ->
 
 -spec init(group()) -> t().
 init(Group) ->
-    set(seqnum, 0, mk_store(Group)).
+    %% NOTE: Empty store is dirty because rootset needs to be persisted.
+    mark_dirty(mk_store(Group)).
 
 -spec open(group()) -> t() | false.
 open(Group) ->
@@ -262,22 +271,41 @@ mk_store(Group) ->
     #{
         group => Group,
         stream => #{},
+        seqnum => 0,
         seqmap => #{},
         stage => #{}
     }.
 
 open_store(Store = #{group := Group}) ->
-    %% TODO: Unavailability concerns.
+    ReadRootset = mk_read_root_batch(Group),
+    case emqx_ds:store_batch(?DS_DB, ReadRootset, #{sync => true}) of
+        ok ->
+            false;
+        {error, unrecoverable, {precondition_failed, RootMessage}} ->
+            Rootset = open_root_message(RootMessage),
+            slurp_store(Rootset, Store)
+    end.
+
+slurp_store(Rootset, Acc) ->
+    slurp_store(Rootset, #{}, ?STORE_SLURP_RETRIES, ?STORE_SLURP_RETRY_TIMEOUT, Acc).
+
+slurp_store(Rootset, StreamIts0, Retries, RetryTimeout, Acc = #{group := Group}) ->
     TopicFilter = mk_store_wildcard(Group),
-    Streams = emqx_ds:get_streams(?DS_DB, TopicFilter, _StartTime = 0),
-    Streams =/= [] andalso
-        ds_streams_fold(
-            fun(Message, StoreAcc) -> open_message(Message, StoreAcc) end,
-            Store,
-            Streams,
-            TopicFilter,
-            0
-        ).
+    StreamIts1 = ds_refresh_streams(TopicFilter, _StartTime = 0, StreamIts0),
+    {StreamIts, Store} = ds_streams_fold(
+        fun(Message, StoreAcc) -> open_message(Message, StoreAcc) end,
+        Acc,
+        StreamIts1
+    ),
+    case map_get(seqnum, Store) of
+        SeqNum when SeqNum >= map_get(seqnum, Rootset) ->
+            maps:merge(Store, Rootset);
+        _Mismatch when Retries > 0 ->
+            ok = timer:sleep(RetryTimeout),
+            slurp_store(Rootset, StreamIts, Retries - 1, RetryTimeout, Store);
+        _Mismatch ->
+            {error, unrecoverable, {leader_store_inconsistent, Store, Rootset}}
+    end.
 
 -spec get(space_name(), _ID, t()) -> _Value.
 get(SpaceName, ID, Store) ->
@@ -299,28 +327,23 @@ size(SpaceName, Store) ->
     map_size(maps:get(SpaceName, Store)).
 
 -spec put(space_name(), _ID, _Value, t()) -> t().
-put(SpaceName, ID, Value, Store0 = #{stage := Stage}) ->
+put(SpaceName, ID, Value, Store0 = #{stage := Stage, seqnum := SeqNum0, seqmap := SeqMap}) ->
     Space0 = maps:get(SpaceName, Store0),
     Space1 = maps:put(ID, Value, Space0),
+    SeqNum = SeqNum0 + 1,
     SK = ?STORE_SK(SpaceName, ID),
-    Store1 = Store0#{
+    Store = Store0#{
         SpaceName := Space1,
-        stage := Stage#{SK => Value}
+        seqnum := SeqNum,
+        stage := Stage#{SK => ?STORE_STAGE_ENTRY(SeqNum, Value)}
     },
     case map_size(Space1) of
         S when S > map_size(Space0) ->
-            assign_seqnum(SK, Store1);
+            Store#{seqmap := maps:put(SK, SeqNum, SeqMap)};
         _ ->
-            Store1
+            Store
     end.
 
-assign_seqnum(SK, Store0 = #{seqmap := SeqMap}) ->
-    SeqNum = get(seqnum, Store0) + 1,
-    Store1 = set(seqnum, SeqNum, Store0),
-    Store1#{
-        seqmap := maps:put(SK, SeqNum, SeqMap)
-    }.
-
 get_seqnum(?STORE_SK(_SpaceName, _) = SK, SeqMap) ->
     maps:get(SK, SeqMap);
 get_seqnum(_VarName, _SeqMap) ->
@@ -331,10 +354,12 @@ get(VarName, Store) ->
     maps:get(VarName, Store).
 
 -spec set(var_name(), _Value, t()) -> t().
-set(VarName, Value, Store0 = #{stage := Stage}) ->
-    Store0#{
+set(VarName, Value, Store = #{stage := Stage, seqnum := SeqNum0}) ->
+    SeqNum = SeqNum0 + 1,
+    Store#{
         VarName => Value,
-        stage := Stage#{VarName => Value}
+        seqnum := SeqNum,
+        stage := Stage#{VarName => ?STORE_STAGE_ENTRY(SeqNum, Value)}
     }.
 
 -spec delete(space_name(), _ID, t()) -> t().
@@ -343,6 +368,9 @@ delete(SpaceName, ID, Store = #{stage := Stage, seqmap := SeqMap}) ->
     Space1 = maps:remove(ID, Space0),
     case map_size(Space1) of
         S when S < map_size(Space0) ->
+            %% NOTE
+            %% We do not bump seqnum on deletions because tracking them does
+            %% not make a lot of sense, assuming batches are atomic.
             SK = ?STORE_SK(SpaceName, ID),
             Store#{
                 SpaceName := Space1,
@@ -353,7 +381,15 @@ delete(SpaceName, ID, Store = #{stage := Stage, seqmap := SeqMap}) ->
             Store
     end.
 
+mark_dirty(Store) ->
+    Store#{dirty => true}.
+
+mark_clean(Store) ->
+    maps:remove(dirty, Store).
+
 -spec dirty(t()) -> boolean().
+dirty(#{dirty := Dirty}) ->
+    Dirty;
 dirty(#{stage := Stage}) ->
     map_size(Stage) > 0.
 
@@ -361,14 +397,17 @@ dirty(#{stage := Stage}) ->
 %% Does nothing if there are no staged changes.
 -spec commit_dirty(leader_claim(_), t()) ->
     {ok, t()} | emqx_ds:error(_).
-commit_dirty(_LeaderClaim, Store = #{stage := Stage}) when map_size(Stage) =:= 0 ->
-    {ok, Store};
-commit_dirty(LeaderClaim, Store = #{group := Group}) ->
+commit_dirty(LeaderClaim, Store = #{dirty := true}) ->
+    commit(LeaderClaim, Store);
+commit_dirty(LeaderClaim, Store = #{stage := Stage}) when map_size(Stage) > 0 ->
+    commit(LeaderClaim, Store).
+
+commit(LeaderClaim, Store = #{group := Group}) ->
     Operations = mk_store_operations(Store),
     Batch = mk_store_batch(Group, LeaderClaim, Operations),
     case emqx_ds:store_batch(?DS_DB, Batch, #{sync => true}) of
         ok ->
-            {ok, Store#{stage := #{}}};
+            {ok, mark_clean(Store#{stage := #{}})};
         {error, unrecoverable, {precondition_failed, Mismatch}} ->
             {error, unrecoverable, {leadership_lost, decode_leader_msg(Mismatch)}};
         Error ->
@@ -386,7 +425,7 @@ commit_renew(LeaderClaim, TS, Store = #{group := Group}) ->
             Batch = mk_store_batch(Group, LeaderClaim, RenewedClaim, Operations),
             case emqx_ds:store_batch(?DS_DB, Batch, #{sync => true}) of
                 ok ->
-                    {ok, RenewedClaim, Store#{stage := #{}}};
+                    {ok, RenewedClaim, mark_clean(Store#{stage := #{}})};
                 {error, unrecoverable, {precondition_failed, Mismatch}} ->
                     {error, unrecoverable, {leadership_lost, decode_leader_msg(Mismatch)}};
                 Error ->
@@ -408,15 +447,28 @@ mk_store_batch(Group, ExistingClaim, RenewedClaim, Operations) ->
         operations = [encode_leader_claim(Group, RenewedClaim) | Operations]
     }.
 
-mk_store_operations(#{group := Group, stage := Stage, seqmap := SeqMap}) ->
+mk_store_operations(Store = #{group := Group, stage := Stage, seqmap := SeqMap}) ->
+    %% NOTE: Always persist rootset.
+    RootOperation = mk_store_root(Store),
     maps:fold(
         fun(SK, Value, Acc) ->
             [mk_store_operation(Group, SK, Value, SeqMap) | Acc]
         end,
-        [],
+        [RootOperation],
         Stage
     ).
 
+mk_store_root(Store = #{group := Group}) ->
+    Payload = maps:filter(fun(V, _) -> ?STORE_IS_ROOTSET(V) end, Store),
+    #message{
+        id = <<>>,
+        qos = 0,
+        from = Group,
+        topic = mk_store_root_topic(Group),
+        payload = term_to_binary(Payload),
+        timestamp = 0
+    }.
+
 mk_store_operation(Group, SK, ?STORE_TOMBSTONE, SeqMap) ->
     {delete, #message_matcher{
         from = Group,
@@ -424,7 +476,7 @@ mk_store_operation(Group, SK, ?STORE_TOMBSTONE, SeqMap) ->
         payload = '_',
         timestamp = get_seqnum(SK, SeqMap)
     }};
-mk_store_operation(Group, SK, Value, SeqMap) ->
+mk_store_operation(Group, SK, ?STORE_STAGE_ENTRY(ChangeSeqNum, Value), SeqMap) ->
     %% NOTE
     %% Using `SeqNum` as timestamp to further disambiguate one record (message) from
     %% another in the DS DB keyspace. As an example, Skipstream-LTS storage layout
@@ -438,39 +490,54 @@ mk_store_operation(Group, SK, Value, SeqMap) ->
         from = Group,
         topic = mk_store_topic(Group, SK, SeqMap),
         payload = term_to_binary(Payload),
-        timestamp = get_seqnum(SK, SeqMap)
+        timestamp = get_seqnum(SK, SeqMap),
+        %% NOTE: Preserving the seqnum when this change has happened.
+        headers = #{?STORE_HEADER_CHANGESEQNUM => ChangeSeqNum}
     }.
 
-open_message(Msg = #message{topic = Topic, payload = Payload, timestamp = SeqNum}, Store) ->
-    try
-        case emqx_topic:tokens(Topic) of
-            [_Prefix, _Group, SpaceTok, _SeqTok] ->
-                SpaceName = token_to_space(SpaceTok),
-                ?STORE_PAYLOAD(ID, Value) = binary_to_term(Payload),
-                open_message(SpaceName, ID, SeqNum, Value, Store);
-            [_Prefix, _Group, VarTok] ->
-                VarName = token_to_varname(VarTok),
-                Value = binary_to_term(Payload),
-                open_message(VarName, Value, Store)
-        end
-    catch
-        error:_ ->
-            ?tp(warning, "dssubs_leader_store_unrecognized_message", #{
-                group => maps:get(group, Store),
-                message => Msg
-            })
-    end.
+open_root_message(#message{payload = Payload, timestamp = 0}) ->
+    #{} = binary_to_term(Payload).
+
+open_message(
+    Msg = #message{topic = Topic, payload = Payload, timestamp = SeqNum, headers = Headers}, Store
+) ->
+    Entry =
+        try
+            ChangeSeqNum = maps:get(?STORE_HEADER_CHANGESEQNUM, Headers),
+            case emqx_topic:tokens(Topic) of
+                [_Prefix, _Group, SpaceTok, _SeqTok] ->
+                    SpaceName = token_to_space(SpaceTok),
+                    ?STORE_PAYLOAD(ID, Value) = binary_to_term(Payload),
+                    %% TODO: Records.
+                    Record = {SpaceName, ID, Value, SeqNum};
+                [_Prefix, _Group, VarTok] ->
+                    VarName = token_to_varname(VarTok),
+                    Value = binary_to_term(Payload),
+                    Record = {VarName, Value}
+            end,
+            {ChangeSeqNum, Record}
+        catch
+            error:_ ->
+                ?tp(warning, "dssubs_leader_store_unrecognized_message", #{
+                    group => maps:get(group, Store),
+                    message => Msg
+                }),
+                unrecognized
+        end,
+    open_entry(Entry, Store).
 
-open_message(SpaceName, ID, SeqNum, Value, Store = #{seqmap := SeqMap}) ->
+open_entry({ChangeSeqNum, Record}, Store = #{seqnum := SeqNum}) ->
+    open_record(Record, Store#{seqnum := max(ChangeSeqNum, SeqNum)}).
+
+open_record({SpaceName, ID, Value, SeqNum}, Store = #{seqmap := SeqMap}) ->
     Space0 = maps:get(SpaceName, Store),
     Space1 = maps:put(ID, Value, Space0),
     SK = ?STORE_SK(SpaceName, ID),
     Store#{
         SpaceName := Space1,
         seqmap := SeqMap#{SK => SeqNum}
-    }.
-
-open_message(VarName, Value, Store) ->
+    };
+open_record({VarName, Value}, Store) ->
     Store#{VarName => Value}.
 
 mk_store_payload(?STORE_SK(_SpaceName, ID), Value) ->
@@ -478,6 +545,9 @@ mk_store_payload(?STORE_SK(_SpaceName, ID), Value) ->
 mk_store_payload(_VarName, Value) ->
     Value.
 
+mk_store_root_topic(GroupName) ->
+    emqx_topic:join([?STORE_TOPIC_PREFIX, GroupName]).
+
 mk_store_topic(GroupName, ?STORE_SK(SpaceName, _) = SK, SeqMap) ->
     SeqNum = get_seqnum(SK, SeqMap),
     SeqTok = integer_to_binary(SeqNum),
@@ -486,32 +556,63 @@ mk_store_topic(GroupName, VarName, _SeqMap) ->
     emqx_topic:join([?STORE_TOPIC_PREFIX, GroupName, varname_to_token(VarName)]).
 
 mk_store_wildcard(GroupName) ->
-    [?STORE_TOPIC_PREFIX, GroupName, '#'].
+    [?STORE_TOPIC_PREFIX, GroupName, '+', '#'].
+
+mk_read_root_batch(Group) ->
+    %% NOTE
+    %% Construct batch that essentially does nothing but reads rootset in a consistent
+    %% manner.
+    Matcher = #message_matcher{
+        from = Group,
+        topic = mk_store_root_topic(Group),
+        payload = '_',
+        timestamp = 0
+    },
+    #dsbatch{
+        preconditions = [{unless_exists, Matcher}],
+        operations = [{delete, Matcher#message_matcher{payload = <<>>}}]
+    }.
 
-ds_streams_fold(Fun, AccIn, Streams, TopicFilter, StartTime) ->
+ds_refresh_streams(TopicFilter, StartTime, StreamIts) ->
+    Streams = emqx_ds:get_streams(?DS_DB, TopicFilter, StartTime),
     lists:foldl(
         fun({_Rank, Stream}, Acc) ->
-            ds_stream_fold(Fun, Acc, Stream, TopicFilter, StartTime)
+            case StreamIts of
+                #{Stream := _It} ->
+                    Acc;
+                #{} ->
+                    %% TODO: Gracefully handle `emqx_ds:error(_)`?
+                    {ok, It} = emqx_ds:make_iterator(?DS_DB, Stream, TopicFilter, StartTime),
+                    Acc#{Stream => It}
+            end
         end,
-        AccIn,
+        StreamIts,
         Streams
     ).
 
-ds_stream_fold(Fun, Acc, Stream, TopicFilter, StartTime) ->
-    %% TODO: Gracefully handle `emqx_ds:error(_)`?
-    {ok, It} = emqx_ds:make_iterator(?DS_DB, Stream, TopicFilter, StartTime),
-    ds_stream_fold(Fun, Acc, It).
+ds_streams_fold(Fun, AccIn, StreamItsIn) ->
+    maps:fold(
+        fun(Stream, It0, {StreamIts, Acc0}) ->
+            {It, Acc} = ds_stream_fold(Fun, Acc0, It0),
+            {StreamIts#{Stream := It}, Acc}
+        end,
+        {StreamItsIn, AccIn},
+        StreamItsIn
+    ).
 
+ds_stream_fold(_Fun, Acc0, end_of_stream) ->
+    %% NOTE: Assuming atom `end_of_stream` is not a valid `emqx_ds:iterator()`.
+    {end_of_stream, Acc0};
 ds_stream_fold(Fun, Acc0, It0) ->
     %% TODO: Gracefully handle `emqx_ds:error(_)`?
     case emqx_ds:next(?DS_DB, It0, ?STORE_BATCH_SIZE) of
         {ok, It, Messages = [_ | _]} ->
             Acc1 = lists:foldl(fun({_Key, Msg}, Acc) -> Fun(Msg, Acc) end, Acc0, Messages),
             ds_stream_fold(Fun, Acc1, It);
-        {ok, _It, []} ->
-            Acc0;
+        {ok, It, []} ->
+            {It, Acc0};
         {ok, end_of_stream} ->
-            Acc0
+            {end_of_stream, Acc0}
     end.
 
 %%
@@ -525,9 +626,7 @@ token_to_space(<<"prog">>) -> progress;
 token_to_space(<<"seq">>) -> sequence.
 
 varname_to_token(rank_progress) -> <<"rankp">>;
-varname_to_token(start_time) -> <<"stime">>;
-varname_to_token(seqnum) -> <<"seqn">>.
+varname_to_token(start_time) -> <<"stime">>.
 
 token_to_varname(<<"rankp">>) -> rank_progress;
-token_to_varname(<<"stime">>) -> start_time;
-token_to_varname(<<"seqn">>) -> seqnum.
+token_to_varname(<<"stime">>) -> start_time.