Просмотр исходного кода

feat(ds): Assign latest timestamp deterministically

ieQu1 1 год назад
Родитель
Сommit
a0a3977043

+ 6 - 6
apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl

@@ -734,20 +734,20 @@ apply(
     {State, Result};
 apply(
     _RaftMeta,
-    #{?tag := storage_event, ?payload := CustomEvent},
+    #{?tag := storage_event, ?payload := CustomEvent, ?now := Now},
     #{db_shard := DBShard, latest := Latest0} = State
 ) ->
-    {Timestamp, Latest} = ensure_monotonic_timestamp(emqx_ds:timestamp_us(), Latest0),
+    Latest = max(Latest0, Now),
     set_ts(DBShard, Latest),
     ?tp(
         debug,
         emqx_ds_replication_layer_storage_event,
         #{
-            shard => DBShard, ts => Timestamp, payload => CustomEvent
+            shard => DBShard, payload => CustomEvent, latest => Latest
         }
     ),
-    Effects = handle_custom_event(DBShard, Timestamp, CustomEvent),
-    {State#{latest := Latest}, ok, Effects}.
+    Effects = handle_custom_event(DBShard, Latest, CustomEvent),
+    {State#{latest => Latest}, ok, Effects}.
 
 -spec tick(integer(), ra_state()) -> ra_machine:effects().
 tick(TimeMs, #{db_shard := DBShard = {DB, Shard}, latest := Latest}) ->
@@ -791,7 +791,7 @@ snapshot_module() ->
 handle_custom_event(DBShard, Latest, Event) ->
     try
         Events = emqx_ds_storage_layer:handle_event(DBShard, Latest, Event),
-        [{append, #{?tag => storage_event, ?payload => I}} || I <- Events]
+        [{append, #{?tag => storage_event, ?payload => I, ?now => Latest}} || I <- Events]
     catch
         EC:Err:Stacktrace ->
             ?tp(error, ds_storage_custom_even_fail, #{

+ 1 - 0
apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl

@@ -43,5 +43,6 @@
 
 %% custom events
 -define(payload, 2).
+-define(now, 3).
 
 -endif.

+ 49 - 19
apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl

@@ -41,7 +41,7 @@ opts(Overrides) ->
         #{
             backend => builtin,
             %% storage => {emqx_ds_storage_reference, #{}},
-            storage => {emqx_ds_storage_bitfield_lts, #{epoch_bits => 1}},
+            storage => {emqx_ds_storage_bitfield_lts, #{epoch_bits => 10}},
             n_shards => 16,
             n_sites => 1,
             replication_factor => 3,
@@ -159,7 +159,6 @@ t_rebalance('end', Config) ->
 t_rebalance(Config) ->
     NMsgs = 50,
     NClients = 5,
-    NEvents = NMsgs * NClients,
     %% List of fake client IDs:
     Clients = [integer_to_binary(I) || I <- lists:seq(1, NClients)],
     %% List of streams that generate messages for each "client" in its own topic:
@@ -168,7 +167,16 @@ t_rebalance(Config) ->
      || ClientId <- Clients
     ],
     %% Interleaved list of events:
-    Stream = emqx_utils_stream:interleave([{2, Stream} || {_ClientId, Stream} <- TopicStreams]),
+    Stream0 = emqx_utils_stream:interleave(
+        [{2, Stream} || {_ClientId, Stream} <- TopicStreams], true
+    ),
+    Stream = emqx_utils_stream:interleave(
+        [
+            {50, Stream0},
+            emqx_utils_stream:const(add_generation)
+        ],
+        false
+    ),
     Nodes = [N1, N2, N3, N4] = ?config(nodes, Config),
     ?check_trace(
         #{timetrap => 30_000},
@@ -176,15 +184,22 @@ t_rebalance(Config) ->
             %% 0. Inject schedulings to make sure the messages are
             %% written to the storage before, during, and after
             %% rebalance:
-            ?force_ordering(#{?snk_kind := test_push_message, n := 10}, #{
-                ?snk_kind := test_start_rebalance
-            }),
-            ?force_ordering(#{?snk_kind := test_start_rebalance}, #{
-                ?snk_kind := test_push_message, n := 20
-            }),
-            ?force_ordering(#{?snk_kind := test_end_rebalance}, #{
-                ?snk_kind := test_push_message, n := 30
-            }),
+            ?force_ordering(
+                #{?snk_kind := test_push_message, n := 10},
+                #{?snk_kind := test_start_rebalance}
+            ),
+            ?force_ordering(
+                #{?snk_kind := test_start_rebalance1},
+                #{?snk_kind := test_push_message, n := 20}
+            ),
+            ?force_ordering(
+                #{?snk_kind := test_push_message, n := 30},
+                #{?snk_kind := test_start_rebalance2}
+            ),
+            ?force_ordering(
+                #{?snk_kind := test_end_rebalance},
+                #{?snk_kind := test_push_message, n := 40}
+            ),
             %% 1. Initialize DB on the first node.
             Opts = opts(#{n_shards => 16, n_sites => 1, replication_factor => 3}),
 
@@ -224,7 +239,7 @@ t_rebalance(Config) ->
             ),
 
             %% 3. Start rebalance in the meanwhile:
-            ?tp(test_start_rebalance, #{}),
+            ?tp(test_start_rebalance1, #{}),
             %% 3.1 Join the second site to the DB replication sites.
             ?assertEqual(ok, ?ON(N1, emqx_ds_replication_layer_meta:join_db_site(?DB, S2))),
             %% Should be no-op.
@@ -233,6 +248,7 @@ t_rebalance(Config) ->
 
             ?retry(1000, 10, ?assertEqual([], transitions(N1, ?DB))),
 
+            ?tp(test_start_rebalance2, #{}),
             %% Now join the rest of the sites.
             ?assertEqual(ok, ds_repl_meta(N2, assign_db_sites, [?DB, Sites])),
             ct:pal("Transitions (~p -> ~p): ~p~n", [[S1, S2], Sites, transitions(N1, ?DB)]),
@@ -619,7 +635,9 @@ without_extra(L) ->
 -type ds_stream() :: emqx_utils_stream:stream({emqx_ds:message_key(), emqx_types:message()}).
 
 %% Create a stream from the topic (wildcards are NOT supported for a
-%% good reason: order of messages is implementation-dependent!):
+%% good reason: order of messages is implementation-dependent!).
+%%
+%% Note: stream produces messages with keys
 -spec ds_topic_stream(binary(), binary(), node()) -> ds_stream().
 ds_topic_stream(ClientId, TopicBin, Node) ->
     Topic = emqx_topic:words(TopicBin),
@@ -638,7 +656,6 @@ ds_topic_stream(ClientId, TopicBin, Node) ->
      || {_RankY, S} <- lists:sort(DSStreams)
     ]).
 
-%% Note: produces messages with keys
 ds_topic_generation_stream(Node, Shard, Topic, Stream) ->
     {ok, Iterator} = ?ON(
         Node,
@@ -647,11 +664,20 @@ ds_topic_generation_stream(Node, Shard, Topic, Stream) ->
     do_ds_topic_generation_stream(Node, Shard, Iterator).
 
 do_ds_topic_generation_stream(Node, Shard, It0) ->
-    Now = 99999999999999999999,
     fun() ->
-        case ?ON(Node, emqx_ds_storage_layer:next(Shard, It0, 1, Now)) of
+        case
+            ?ON(
+                Node,
+                begin
+                    Now = emqx_ds_replication_layer:current_timestamp(?DB, Shard),
+                    emqx_ds_storage_layer:next(Shard, It0, 1, Now)
+                end
+            )
+        of
             {ok, It, []} ->
                 [];
+            {ok, end_of_stream} ->
+                [];
             {ok, It, [KeyMsg]} ->
                 [KeyMsg | do_ds_topic_generation_stream(Node, Shard, It)]
         end
@@ -673,7 +699,11 @@ apply_stream(DB, NodeStream0, Stream0, N) ->
                 )
             ),
             ?ON(Node, emqx_ds:store_batch(DB, [Msg], #{sync => true})),
-            apply_stream(DB, NodeStream, Stream, N + 1)
+            apply_stream(DB, NodeStream, Stream, N + 1);
+        [add_generation | Stream] ->
+            [Node | NodeStream] = emqx_utils_stream:next(NodeStream0),
+            %% add_generation(Node, DB),
+            apply_stream(DB, NodeStream, Stream, N)
     end.
 
 %% @doc Create an infinite list of messages from a given client:
@@ -724,7 +754,7 @@ verify_stream_effects(TestCase, Node, ClientId, ExpectedStream) ->
             snabbkaffe_diff:assert_lists_eq(
                 ExpectedStream,
                 ds_topic_stream(ClientId, client_topic(TestCase, ClientId), Node),
-                ?diff_opts#{comment => #{clientid => ClientId, node => Node}}
+                ?diff_opts
             ),
             ct:pal("Data for client ~p on ~p is consistent.", [ClientId, Node])
         end

+ 20 - 12
apps/emqx_utils/src/emqx_utils_stream.erl

@@ -20,13 +20,14 @@
 -export([
     empty/0,
     list/1,
+    const/1,
     mqueue/1,
     map/2,
     transpose/1,
     chain/1,
     chain/2,
     repeat/1,
-    interleave/1,
+    interleave/2,
     limit_length/2
 ]).
 
@@ -72,6 +73,11 @@ list([]) ->
 list([X | Rest]) ->
     fun() -> [X | list(Rest)] end.
 
+%% @doc Make a stream with a single element infinitely repeated
+-spec const(T) -> stream(T).
+const(T) ->
+    fun() -> [T | const(T)] end.
+
 %% @doc Make a stream out of process message queue.
 -spec mqueue(timeout()) -> stream(any()).
 mqueue(Timeout) ->
@@ -158,8 +164,8 @@ repeat(S) ->
 %% specifies size of the "batch" to be consumed from the stream at a
 %% time (stream is the second tuple element). If element of the list
 %% is a plain stream, then the batch size is assumed to be 1.
--spec interleave([stream(X) | {non_neg_integer(), stream(X)}]) -> stream(X).
-interleave(L0) ->
+-spec interleave([stream(X) | {non_neg_integer(), stream(X)}], boolean()) -> stream(X).
+interleave(L0, ContinueAtEmpty) ->
     L = lists:map(
         fun
             (Stream) when is_function(Stream) ->
@@ -170,7 +176,7 @@ interleave(L0) ->
         L0
     ),
     fun() ->
-        do_interleave(0, L, [])
+        do_interleave(ContinueAtEmpty, 0, L, [])
     end.
 
 %% @doc Truncate list to the given length
@@ -281,21 +287,23 @@ csv_read_line([Line | Lines]) ->
 csv_read_line([]) ->
     eof.
 
-do_interleave(_, [], []) ->
+do_interleave(_Cont, _, [], []) ->
     [];
-do_interleave(N, [{N, S} | Rest], Rev) ->
-    do_interleave(0, Rest, [{N, S} | Rev]);
-do_interleave(_, [], Rev) ->
-    do_interleave(0, lists:reverse(Rev), []);
-do_interleave(I, [{N, S} | Rest], Rev) when I < N ->
+do_interleave(Cont, N, [{N, S} | Rest], Rev) ->
+    do_interleave(Cont, 0, Rest, [{N, S} | Rev]);
+do_interleave(Cont, _, [], Rev) ->
+    do_interleave(Cont, 0, lists:reverse(Rev), []);
+do_interleave(Cont, I, [{N, S} | Rest], Rev) when I < N ->
     case next(S) of
+        [] when Cont ->
+            do_interleave(Cont, 0, Rest, Rev);
         [] ->
-            do_interleave(0, Rest, Rev);
+            [];
         [X | S1] ->
             [
                 X
                 | fun() ->
-                    do_interleave(I + 1, [{N, S1} | Rest], Rev)
+                    do_interleave(Cont, I + 1, [{N, S1} | Rest], Rev)
                 end
             ]
     end.