|
|
@@ -672,22 +672,15 @@ apply(
|
|
|
?tag := ?BATCH,
|
|
|
?batch_messages := MessagesIn
|
|
|
},
|
|
|
- #{db_shard := DBShard, latest := Latest} = State
|
|
|
+ #{db_shard := DBShard, latest := Latest0} = State
|
|
|
) ->
|
|
|
%% NOTE
|
|
|
%% Unique timestamp tracking real time closely.
|
|
|
%% With microsecond granularity it should be nearly impossible for it to run
|
|
|
%% too far ahead than the real time clock.
|
|
|
- {NLatest, Messages} = assign_timestamps(Latest, MessagesIn),
|
|
|
- %% TODO
|
|
|
- %% Batch is now reversed, but it should not make a lot of difference.
|
|
|
- %% Even if it would be in order, it's still possible to write messages far away
|
|
|
- %% in the past, i.e. when replica catches up with the leader. Storage layer
|
|
|
- %% currently relies on wall clock time to decide if it's safe to iterate over
|
|
|
- %% next epoch, this is likely wrong. Ideally it should rely on consensus clock
|
|
|
- %% time instead.
|
|
|
+ {Latest, Messages} = assign_timestamps(Latest0, MessagesIn),
|
|
|
Result = emqx_ds_storage_layer:store_batch(DBShard, Messages, #{}),
|
|
|
- NState = State#{latest := NLatest},
|
|
|
+ NState = State#{latest := Latest},
|
|
|
%% TODO: Need to measure effects of changing frequency of `release_cursor`.
|
|
|
Effect = {release_cursor, RaftIdx, NState},
|
|
|
{NState, Result, Effect};
|
|
|
@@ -730,7 +723,7 @@ assign_timestamps(Latest, [MessageIn | Rest], Acc) ->
|
|
|
assign_timestamps(Latest + 1, Rest, [Message | Acc])
|
|
|
end;
|
|
|
assign_timestamps(Latest, [], Acc) ->
|
|
|
- {Latest, Acc}.
|
|
|
+ {Latest, lists:reverse(Acc)}.
|
|
|
|
|
|
assign_timestamp(TimestampUs, Message) ->
|
|
|
{TimestampUs, Message}.
|