Explorar el Código

feat(dslocal): implement `force_monotonic_timestamps => false`

Andrew Mayorov hace 1 año
padre
commit
02e1007a16

+ 27 - 15
apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local.erl

@@ -202,7 +202,7 @@ store_batch(DB, Messages, Opts) ->
             {error, recoverable, Reason}
     end.
 
--record(bs, {options :: term()}).
+-record(bs, {options :: emqx_ds:create_db_opts()}).
 -type buffer_state() :: #bs{}.
 
 -spec init_buffer(emqx_ds:db(), shard(), _Options) -> {ok, buffer_state()}.
@@ -220,24 +220,36 @@ init_buffer(DB, Shard, Options) ->
 -spec flush_buffer(emqx_ds:db(), shard(), [emqx_types:message()], buffer_state()) ->
     {buffer_state(), emqx_ds:store_batch_result()}.
 flush_buffer(DB, Shard, Messages, S0 = #bs{options = Options}) ->
-    {Latest, Batch} = assign_timestamps(current_timestamp({DB, Shard}), Messages),
-    Result = emqx_ds_storage_layer:store_batch({DB, Shard}, Batch, Options),
-    emqx_ds_builtin_local_meta:set_current_timestamp({DB, Shard}, Latest),
+    ShardId = {DB, Shard},
+    ForceMonotonic = maps:get(force_monotonic_timestamps, Options),
+    {Latest, Batch} = make_batch(ForceMonotonic, current_timestamp(ShardId), Messages),
+    Result = emqx_ds_storage_layer:store_batch(ShardId, Batch, _Options = #{}),
+    emqx_ds_builtin_local_meta:set_current_timestamp(ShardId, Latest),
     {S0, Result}.
 
-assign_timestamps(Latest, Messages) ->
-    assign_timestamps(Latest, Messages, []).
+make_batch(_ForceMonotonic = true, Latest, Messages) ->
+    assign_monotonic_timestamps(Latest, Messages, []);
+make_batch(false, Latest, Messages) ->
+    assign_message_timestamps(Latest, Messages, []).
 
-assign_timestamps(Latest, [MessageIn | Rest], Acc) ->
-    case emqx_message:timestamp(MessageIn, microsecond) of
-        TimestampUs when TimestampUs > Latest ->
-            Message = assign_timestamp(TimestampUs, MessageIn),
-            assign_timestamps(TimestampUs, Rest, [Message | Acc]);
+assign_monotonic_timestamps(Latest0, [Message | Rest], Acc0) ->
+    case emqx_message:timestamp(Message, microsecond) of
+        TimestampUs when TimestampUs > Latest0 ->
+            Latest = TimestampUs;
         _Earlier ->
-            Message = assign_timestamp(Latest + 1, MessageIn),
-            assign_timestamps(Latest + 1, Rest, [Message | Acc])
-    end;
-assign_timestamps(Latest, [], Acc) ->
+            Latest = Latest0 + 1
+    end,
+    Acc = [assign_timestamp(Latest, Message) | Acc0],
+    assign_monotonic_timestamps(Latest, Rest, Acc);
+assign_monotonic_timestamps(Latest, [], Acc) ->
+    {Latest, lists:reverse(Acc)}.
+
+assign_message_timestamps(Latest0, [Message | Rest], Acc0) ->
+    TimestampUs = emqx_message:timestamp(Message, microsecond),
+    Latest = max(TimestampUs, Latest0),
+    Acc = [assign_timestamp(TimestampUs, Message) | Acc0],
+    assign_message_timestamps(Latest, Rest, Acc);
+assign_message_timestamps(Latest, [], Acc) ->
     {Latest, lists:reverse(Acc)}.
 
 assign_timestamp(TimestampUs, Message) ->

+ 6 - 2
apps/emqx_durable_storage/src/emqx_ds.erl

@@ -268,7 +268,7 @@ open_db(DB, Opts = #{backend := Backend}) ->
         Module ->
             persistent_term:put(?persistent_term(DB), Module),
             emqx_ds_sup:register_db(DB, Backend),
-            ?module(DB):open_db(DB, Opts)
+            ?module(DB):open_db(DB, set_db_defaults(Opts))
     end.
 
 -spec close_db(db()) -> ok.
@@ -286,7 +286,7 @@ add_generation(DB) ->
 
 -spec update_db_config(db(), create_db_opts()) -> ok.
 update_db_config(DB, Opts) ->
-    ?module(DB):update_db_config(DB, Opts).
+    ?module(DB):update_db_config(DB, set_db_defaults(Opts)).
 
 -spec list_generations_with_lifetimes(db()) -> #{generation_rank() => generation_info()}.
 list_generations_with_lifetimes(DB) ->
@@ -417,6 +417,10 @@ timestamp_us() ->
 %% Internal functions
 %%================================================================================
 
+set_db_defaults(Opts) ->
+    Defaults = #{force_monotonic_timestamps => true},
+    maps:merge(Defaults, Opts).
+
 call_if_implemented(Mod, Fun, Args, Default) ->
     case erlang:function_exported(Mod, Fun, length(Args)) of
         true ->