Przeglądaj źródła

Merge pull request #13474 from keynslug/ft/EMQX-12309/ds-cas-api

feat(ds): allow isolated batches with preconditions
Andrew Mayorov 1 rok temu
rodzic
commit
3a893626b8

+ 27 - 15
apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local.erl

@@ -202,7 +202,7 @@ store_batch(DB, Messages, Opts) ->
             {error, recoverable, Reason}
     end.
 
--record(bs, {options :: term()}).
+-record(bs, {options :: emqx_ds:create_db_opts()}).
 -type buffer_state() :: #bs{}.
 
 -spec init_buffer(emqx_ds:db(), shard(), _Options) -> {ok, buffer_state()}.
@@ -220,24 +220,36 @@ init_buffer(DB, Shard, Options) ->
 -spec flush_buffer(emqx_ds:db(), shard(), [emqx_types:message()], buffer_state()) ->
     {buffer_state(), emqx_ds:store_batch_result()}.
 flush_buffer(DB, Shard, Messages, S0 = #bs{options = Options}) ->
-    {Latest, Batch} = assign_timestamps(current_timestamp({DB, Shard}), Messages),
-    Result = emqx_ds_storage_layer:store_batch({DB, Shard}, Batch, Options),
-    emqx_ds_builtin_local_meta:set_current_timestamp({DB, Shard}, Latest),
+    ShardId = {DB, Shard},
+    ForceMonotonic = maps:get(force_monotonic_timestamps, Options),
+    {Latest, Batch} = make_batch(ForceMonotonic, current_timestamp(ShardId), Messages),
+    Result = emqx_ds_storage_layer:store_batch(ShardId, Batch, _Options = #{}),
+    emqx_ds_builtin_local_meta:set_current_timestamp(ShardId, Latest),
     {S0, Result}.
 
-assign_timestamps(Latest, Messages) ->
-    assign_timestamps(Latest, Messages, []).
+make_batch(_ForceMonotonic = true, Latest, Messages) ->
+    assign_monotonic_timestamps(Latest, Messages, []);
+make_batch(false, Latest, Messages) ->
+    assign_message_timestamps(Latest, Messages, []).
 
-assign_timestamps(Latest, [MessageIn | Rest], Acc) ->
-    case emqx_message:timestamp(MessageIn, microsecond) of
-        TimestampUs when TimestampUs > Latest ->
-            Message = assign_timestamp(TimestampUs, MessageIn),
-            assign_timestamps(TimestampUs, Rest, [Message | Acc]);
+assign_monotonic_timestamps(Latest0, [Message | Rest], Acc0) ->
+    case emqx_message:timestamp(Message, microsecond) of
+        TimestampUs when TimestampUs > Latest0 ->
+            Latest = TimestampUs;
         _Earlier ->
-            Message = assign_timestamp(Latest + 1, MessageIn),
-            assign_timestamps(Latest + 1, Rest, [Message | Acc])
-    end;
-assign_timestamps(Latest, [], Acc) ->
+            Latest = Latest0 + 1
+    end,
+    Acc = [assign_timestamp(Latest, Message) | Acc0],
+    assign_monotonic_timestamps(Latest, Rest, Acc);
+assign_monotonic_timestamps(Latest, [], Acc) ->
+    {Latest, lists:reverse(Acc)}.
+
+assign_message_timestamps(Latest0, [Message | Rest], Acc0) ->
+    TimestampUs = emqx_message:timestamp(Message, microsecond),
+    Latest = max(TimestampUs, Latest0),
+    Acc = [assign_timestamp(TimestampUs, Message) | Acc0],
+    assign_message_timestamps(Latest, Rest, Acc);
+assign_message_timestamps(Latest, [], Acc) ->
     {Latest, lists:reverse(Acc)}.
 
 assign_timestamp(TimestampUs, Message) ->

+ 0 - 1
apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local_meta.erl

@@ -151,7 +151,6 @@ ensure_monotonic_timestamp(ShardId) ->
 %%================================================================================
 
 -record(s, {}).
--define(timer_update, timer_update).
 
 init([]) ->
     process_flag(trap_exit, true),

+ 24 - 0
apps/emqx_durable_storage/include/emqx_ds.hrl

@@ -16,4 +16,28 @@
 -ifndef(EMQX_DS_HRL).
 -define(EMQX_DS_HRL, true).
 
+-record(dsbatch, {
+    operations :: [emqx_ds:operation()],
+    preconditions = [] :: [emqx_ds:precondition()]
+}).
+
+-record(message_matcher, {
+    %% Fields identifying the message:
+    %% Client identifier
+    from :: binary(),
+    %% Topic that the message is published to
+    topic :: emqx_types:topic(),
+    %% Timestamp (Unit: millisecond)
+    timestamp :: integer(),
+
+    %% Fields the message is matched against:
+    %% Message Payload
+    payload,
+    %% Message headers
+    headers = #{} :: emqx_types:headers(),
+    %% Extra filters
+    %% Reserved for the forward compatibility purposes.
+    filters = #{}
+}).
+
 -endif.

+ 56 - 13
apps/emqx_durable_storage/src/emqx_ds.erl

@@ -19,6 +19,8 @@
 %% It takes care of forwarding calls to the underlying DBMS.
 -module(emqx_ds).
 
+-include_lib("emqx_durable_storage/include/emqx_ds.hrl").
+
 %% Management API:
 -export([
     register_backend/2,
@@ -52,6 +54,9 @@
     time/0,
     topic_filter/0,
     topic/0,
+    batch/0,
+    operation/0,
+    precondition/0,
     stream/0,
     delete_stream/0,
     delete_selector/0,
@@ -84,8 +89,6 @@
 %% Type declarations
 %%================================================================================
 
--define(APP, emqx_durable_storage).
-
 -type db() :: atom().
 
 %% Parsed topic.
@@ -94,6 +97,32 @@
 %% Parsed topic filter.
 -type topic_filter() :: list(binary() | '+' | '#' | '').
 
+-type message() :: emqx_types:message().
+
+%% Message matcher.
+-type message_matcher(Payload) :: #message_matcher{payload :: Payload}.
+
+%% A batch of storage operations.
+-type batch() :: [operation()] | #dsbatch{}.
+
+-type operation() ::
+    %% Store a message.
+    message()
+    %% Delete a message.
+    %% Does nothing if the message does not exist.
+    | {delete, message_matcher('_')}.
+
+%% Precondition.
+%% Fails whole batch if the storage already has the matching message (`if_exists'),
+%% or does not yet have (`unless_exists'). Here "matching" means that it either
+%% just exists (when pattern is '_') or has exactly the same payload, rest of the
+%% message fields are irrelevant.
+%% Useful to construct batches with "compare-and-set" semantics.
+%% Note: backends may not support this, but if they do only DBs with `atomic_batches'
+%% enabled are expected to support preconditions in batches.
+-type precondition() ::
+    {if_exists | unless_exists, message_matcher(iodata() | '_')}.
+
 -type rank_x() :: term().
 
 -type rank_y() :: integer().
@@ -157,18 +186,25 @@
         %% Whether to wait until the message storage has been acknowledged to return from
         %% `store_batch'.
         %% Default: `true'.
-        sync => boolean(),
-        %% Whether the whole batch given to `store_batch' should be inserted atomically as
-        %% a unit.  Note: the whole batch must be crafted so that it belongs to a single
-        %% shard (if applicable to the backend), as the batch will be split accordingly
-        %% even if this flag is `true'.
-        %% Default: `false'.
-        atomic => boolean()
+        sync => boolean()
     }.
 
 -type generic_db_opts() ::
     #{
         backend := atom(),
+        %% Force strictly monotonic message timestamps.
+        %% Default: `true'.
+        %% Messages are assigned unique, strictly monotonically increasing timestamps.
+        %% Those timestamps form a total order per each serialization key.
+        %% If `false' then message timestamps are respected; timestamp, topic and
+        %% serialization key uniquely identify a message.
+        force_monotonic_timestamps => boolean(),
+        %% Whether the whole batch given to `store_batch' should be processed and
+        %% inserted atomically as a unit, in isolation from other batches.
+        %% Default: `false'.
+        %% The whole batch must be crafted so that it belongs to a single shard (if
+        %% applicable to the backend).
+        atomic_batches => boolean(),
         serialize_by => clientid | topic,
         _ => _
     }.
@@ -261,7 +297,7 @@ open_db(DB, Opts = #{backend := Backend}) ->
         Module ->
             persistent_term:put(?persistent_term(DB), Module),
             emqx_ds_sup:register_db(DB, Backend),
-            ?module(DB):open_db(DB, Opts)
+            ?module(DB):open_db(DB, set_db_defaults(Opts))
     end.
 
 -spec close_db(db()) -> ok.
@@ -279,7 +315,7 @@ add_generation(DB) ->
 
 -spec update_db_config(db(), create_db_opts()) -> ok.
 update_db_config(DB, Opts) ->
-    ?module(DB):update_db_config(DB, Opts).
+    ?module(DB):update_db_config(DB, set_db_defaults(Opts)).
 
 -spec list_generations_with_lifetimes(db()) -> #{generation_rank() => generation_info()}.
 list_generations_with_lifetimes(DB) ->
@@ -306,11 +342,11 @@ drop_db(DB) ->
             Module:drop_db(DB)
     end.
 
--spec store_batch(db(), [emqx_types:message()], message_store_opts()) -> store_batch_result().
+-spec store_batch(db(), batch(), message_store_opts()) -> store_batch_result().
 store_batch(DB, Msgs, Opts) ->
     ?module(DB):store_batch(DB, Msgs, Opts).
 
--spec store_batch(db(), [emqx_types:message()]) -> store_batch_result().
+-spec store_batch(db(), batch()) -> store_batch_result().
 store_batch(DB, Msgs) ->
     store_batch(DB, Msgs, #{}).
 
@@ -410,6 +446,13 @@ timestamp_us() ->
 %% Internal functions
 %%================================================================================
 
+set_db_defaults(Opts) ->
+    Defaults = #{
+        force_monotonic_timestamps => true,
+        atomic_batches => false
+    },
+    maps:merge(Defaults, Opts).
+
 call_if_implemented(Mod, Fun, Args, Default) ->
     case erlang:function_exported(Mod, Fun, length(Args)) of
         true ->