Преглед изворни кода

fix(dsrepl): reassign timestamp at the time of submission

This is needed to ensure total message order for a shard, and
guarantee that no messages will be written "in the past". which
may break replay consistency.
Andrew Mayorov пре 2 година
родитељ
комит
be793e4735

+ 23 - 13
apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl

@@ -555,8 +555,7 @@ ra_start_shard(DB, Shard) ->
 ra_store_batch(DB, Shard, Messages) ->
 ra_store_batch(DB, Shard, Messages) ->
     Command = #{
     Command = #{
         ?tag => ?BATCH,
         ?tag => ?BATCH,
-        ?batch_messages => Messages,
-        ?timestamp => emqx_ds:timestamp_us()
+        ?batch_messages => Messages
     },
     },
     case ra:process_command(ra_leader_servers(DB, Shard), Command) of
     case ra:process_command(ra_leader_servers(DB, Shard), Command) of
         {ok, Result, _Leader} ->
         {ok, Result, _Leader} ->
@@ -652,8 +651,7 @@ apply(
     #{index := RaftIdx},
     #{index := RaftIdx},
     #{
     #{
         ?tag := ?BATCH,
         ?tag := ?BATCH,
-        ?batch_messages := MessagesIn,
-        ?timestamp := TimestampLocal
+        ?batch_messages := MessagesIn
     },
     },
     #{latest := Latest} = State
     #{latest := Latest} = State
 ) ->
 ) ->
@@ -661,18 +659,30 @@ apply(
     %% Unique timestamp tracking real time closely.
     %% Unique timestamp tracking real time closely.
     %% With microsecond granularity it should be nearly impossible for it to run
     %% With microsecond granularity it should be nearly impossible for it to run
     %% too far ahead than the real time clock.
     %% too far ahead than the real time clock.
-    Timestamp = max(Latest + 1, TimestampLocal),
-    Messages = assign_timestamps(Timestamp, MessagesIn),
+    {NLatest, Messages} = assign_timestamps(Latest, MessagesIn),
+    %% TODO
+    %% Batch is now reversed, but it should not make a lot of difference.
+    %% Even if it would be in order, it's still possible to write messages far away
+    %% in the past, i.e. when replica catches up with the leader. Storage layer
+    %% currently relies on wall clock time to decide if it's safe to iterate over
+    %% next epoch, this is likely wrong. Ideally it should rely on consensus clock
+    %% time instead.
     Result = emqx_ds_storage_layer:store_batch(erlang:get(emqx_ds_db_shard), Messages, #{}),
     Result = emqx_ds_storage_layer:store_batch(erlang:get(emqx_ds_db_shard), Messages, #{}),
-    %% NOTE: Last assigned timestamp.
-    NLatest = Timestamp + length(Messages) - 1,
     NState = State#{latest := NLatest},
     NState = State#{latest := NLatest},
     %% TODO: Need to measure effects of changing frequency of `release_cursor`.
     %% TODO: Need to measure effects of changing frequency of `release_cursor`.
     Effect = {release_cursor, RaftIdx, NState},
     Effect = {release_cursor, RaftIdx, NState},
     {NState, Result, Effect}.
     {NState, Result, Effect}.
 
 
-assign_timestamps(Timestamp, [MessageIn | Rest]) ->
-    Message = emqx_message:set_timestamp(Timestamp, MessageIn),
-    [Message | assign_timestamps(Timestamp + 1, Rest)];
-assign_timestamps(_Timestamp, []) ->
-    [].
+assign_timestamps(Latest, Messages) ->
+    assign_timestamps(Latest, Messages, []).
+
+assign_timestamps(Latest, [MessageIn | Rest], Acc) ->
+    case emqx_message:timestamp(MessageIn) of
+        Later when Later > Latest ->
+            assign_timestamps(Later, Rest, [MessageIn | Acc]);
+        _Earlier ->
+            Message = emqx_message:set_timestamp(Latest + 1, MessageIn),
+            assign_timestamps(Latest + 1, Rest, [Message | Acc])
+    end;
+assign_timestamps(Latest, [], Acc) ->
+    {Latest, Acc}.

+ 8 - 3
apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl

@@ -67,8 +67,9 @@ store_batch(DB, Messages, Opts) ->
     case maps:get(atomic, Opts, false) of
     case maps:get(atomic, Opts, false) of
         false ->
         false ->
             lists:foreach(
             lists:foreach(
-                fun(Message) ->
-                    Shard = emqx_ds_replication_layer:shard_of_message(DB, Message, clientid),
+                fun(MessageIn) ->
+                    Shard = emqx_ds_replication_layer:shard_of_message(DB, MessageIn, clientid),
+                    Message = emqx_message:set_timestamp(emqx_ds:timestamp_us(), MessageIn),
                     gen_server:call(
                     gen_server:call(
                         ?via(DB, Shard),
                         ?via(DB, Shard),
                         #enqueue_req{
                         #enqueue_req{
@@ -83,10 +84,14 @@ store_batch(DB, Messages, Opts) ->
         true ->
         true ->
             maps:foreach(
             maps:foreach(
                 fun(Shard, Batch) ->
                 fun(Shard, Batch) ->
+                    Timestamp = emqx_ds:timestamp_us(),
                     gen_server:call(
                     gen_server:call(
                         ?via(DB, Shard),
                         ?via(DB, Shard),
                         #enqueue_atomic_req{
                         #enqueue_atomic_req{
-                            batch = Batch,
+                            batch = [
+                                emqx_message:set_timestamp(Timestamp, Message)
+                             || Message <- Batch
+                            ],
                             sync = Sync
                             sync = Sync
                         },
                         },
                         infinity
                         infinity

+ 1 - 2
apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl

@@ -118,8 +118,7 @@ store_batch(_ShardId, #s{db = DB, cf = CF}, Messages, _Options = #{atomic := tru
 store_batch(_ShardId, #s{db = DB, cf = CF}, Messages, _Options) ->
 store_batch(_ShardId, #s{db = DB, cf = CF}, Messages, _Options) ->
     lists:foreach(
     lists:foreach(
         fun(Msg) ->
         fun(Msg) ->
-            Id = erlang:unique_integer([monotonic]),
-            Key = <<Id:64>>,
+            Key = <<(emqx_message:timestamp(Msg)):64>>,
             Val = term_to_binary(Msg),
             Val = term_to_binary(Msg),
             rocksdb:put(DB, CF, Key, Val, [])
             rocksdb:put(DB, CF, Key, Val, [])
         end,
         end,