Bläddra i källkod

Merge pull request #13593 from thalesmg/20240808-r58-builtin-local-preconditions

feat(ds builtin local): add basic support for atomic batches + preconditions
Thales Macedo Garitezi 1 år sedan
förälder
incheckning
9d6954cf60

+ 2 - 11
apps/emqx_ds_backends/test/emqx_ds_backends_SUITE.erl

@@ -689,18 +689,9 @@ all() ->
 
 
 groups() ->
 groups() ->
     TCs = emqx_common_test_helpers:all(?MODULE),
     TCs = emqx_common_test_helpers:all(?MODULE),
-    %% TODO: Remove once builtin-local supports preconditions + atomic batches.
-    BuiltinLocalTCs =
-        TCs --
-            [
-                t_09_atomic_store_batch,
-                t_11_batch_preconditions,
-                t_12_batch_precondition_conflicts
-            ],
-    BuiltinRaftTCs = TCs,
     [
     [
-        {builtin_local, BuiltinLocalTCs},
-        {builtin_raft, BuiltinRaftTCs}
+        {builtin_local, TCs},
+        {builtin_raft, TCs}
     ].
     ].
 
 
 init_per_group(builtin_local, Config) ->
 init_per_group(builtin_local, Config) ->

+ 45 - 4
apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local.erl

@@ -49,7 +49,9 @@
 %% Internal exports:
 %% Internal exports:
 -export([
 -export([
     do_next/3,
     do_next/3,
-    do_delete_next/4
+    do_delete_next/4,
+    %% Used by batch serializer
+    make_batch/3
 ]).
 ]).
 
 
 -export_type([db_opts/0, shard/0, iterator/0, delete_iterator/0]).
 -export_type([db_opts/0, shard/0, iterator/0, delete_iterator/0]).
@@ -88,7 +90,10 @@
     #{
     #{
         backend := builtin_local,
         backend := builtin_local,
         storage := emqx_ds_storage_layer:prototype(),
         storage := emqx_ds_storage_layer:prototype(),
-        n_shards := pos_integer()
+        n_shards := pos_integer(),
+        %% Inherited from `emqx_ds:generic_db_opts()`.
+        force_monotonic_timestamps => boolean(),
+        atomic_batches => boolean()
     }.
     }.
 
 
 -type generation_rank() :: {shard(), emqx_ds_storage_layer:gen_id()}.
 -type generation_rank() :: {shard(), emqx_ds_storage_layer:gen_id()}.
@@ -193,9 +198,17 @@ drop_db(DB) ->
     ),
     ),
     emqx_ds_builtin_local_meta:drop_db(DB).
     emqx_ds_builtin_local_meta:drop_db(DB).
 
 
--spec store_batch(emqx_ds:db(), [emqx_types:message()], emqx_ds:message_store_opts()) ->
+-spec store_batch(emqx_ds:db(), emqx_ds:batch(), emqx_ds:message_store_opts()) ->
     emqx_ds:store_batch_result().
     emqx_ds:store_batch_result().
-store_batch(DB, Messages, Opts) ->
+store_batch(DB, Batch, Opts) ->
+    case emqx_ds_builtin_local_meta:db_config(DB) of
+        #{atomic_batches := true} ->
+            store_batch_atomic(DB, Batch, Opts);
+        _ ->
+            store_batch_buffered(DB, Batch, Opts)
+    end.
+
+store_batch_buffered(DB, Messages, Opts) ->
     try
     try
         emqx_ds_buffer:store_batch(DB, Messages, Opts)
         emqx_ds_buffer:store_batch(DB, Messages, Opts)
     catch
     catch
@@ -203,6 +216,34 @@ store_batch(DB, Messages, Opts) ->
             {error, recoverable, Reason}
             {error, recoverable, Reason}
     end.
     end.
 
 
+store_batch_atomic(DB, Batch, Opts) ->
+    Shards = shards_of_batch(DB, Batch),
+    case Shards of
+        [Shard] ->
+            emqx_ds_builtin_local_batch_serializer:store_batch_atomic(DB, Shard, Batch, Opts);
+        [] ->
+            ok;
+        [_ | _] ->
+            {error, unrecoverable, atomic_batch_spans_multiple_shards}
+    end.
+
+shards_of_batch(DB, #dsbatch{operations = Operations, preconditions = Preconditions}) ->
+    shards_of_batch(DB, Preconditions, shards_of_batch(DB, Operations, []));
+shards_of_batch(DB, Operations) ->
+    shards_of_batch(DB, Operations, []).
+
+shards_of_batch(DB, [Operation | Rest], Acc) ->
+    case shard_of_operation(DB, Operation, clientid, #{}) of
+        Shard when Shard =:= hd(Acc) ->
+            shards_of_batch(DB, Rest, Acc);
+        Shard when Acc =:= [] ->
+            shards_of_batch(DB, Rest, [Shard]);
+        ShardAnother ->
+            [ShardAnother | Acc]
+    end;
+shards_of_batch(_DB, [], Acc) ->
+    Acc.
+
 -record(bs, {options :: emqx_ds:create_db_opts()}).
 -record(bs, {options :: emqx_ds:create_db_opts()}).
 -type buffer_state() :: #bs{}.
 -type buffer_state() :: #bs{}.
 
 

+ 122 - 0
apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local_batch_serializer.erl

@@ -0,0 +1,122 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_ds_builtin_local_batch_serializer).
+
+-include_lib("emqx_durable_storage/include/emqx_ds.hrl").
+
+%% API
+-export([
+    start_link/3,
+
+    store_batch_atomic/4
+]).
+
+%% `gen_server' API
+-export([
+    init/1,
+    handle_call/3,
+    handle_cast/2
+]).
+
+%%------------------------------------------------------------------------------
+%% Type declarations
+%%------------------------------------------------------------------------------
+
+-define(name(DB, SHARD), {n, l, {?MODULE, DB, SHARD}}).
+-define(via(DB, SHARD), {via, gproc, ?name(DB, SHARD)}).
+
+-record(store_batch_atomic, {batch :: emqx_ds:batch(), opts :: emqx_ds:message_store_opts()}).
+
+%%------------------------------------------------------------------------------
+%% API
+%%------------------------------------------------------------------------------
+
+start_link(DB, Shard, _Opts) ->
+    gen_server:start_link(?via(DB, Shard), ?MODULE, [DB, Shard], []).
+
+store_batch_atomic(DB, Shard, Batch, Opts) ->
+    gen_server:call(?via(DB, Shard), #store_batch_atomic{batch = Batch, opts = Opts}, infinity).
+
+%%------------------------------------------------------------------------------
+%% `gen_server' API
+%%------------------------------------------------------------------------------
+
+init([DB, Shard]) ->
+    process_flag(message_queue_data, off_heap),
+    State = #{
+        db => DB,
+        shard => Shard
+    },
+    {ok, State}.
+
+handle_call(#store_batch_atomic{batch = Batch, opts = StoreOpts}, _From, State) ->
+    ShardId = shard_id(State),
+    DBOpts = db_config(State),
+    Result = do_store_batch_atomic(ShardId, Batch, DBOpts, StoreOpts),
+    {reply, Result, State};
+handle_call(Call, _From, State) ->
+    {reply, {error, {unknown_call, Call}}, State}.
+
+handle_cast(_Cast, State) ->
+    {noreply, State}.
+
+%%------------------------------------------------------------------------------
+%% Internal fns
+%%------------------------------------------------------------------------------
+
+shard_id(#{db := DB, shard := Shard}) ->
+    {DB, Shard}.
+
+db_config(#{db := DB}) ->
+    emqx_ds_builtin_local_meta:db_config(DB).
+
+-spec do_store_batch_atomic(
+    emqx_ds_storage_layer:shard_id(),
+    emqx_ds:dsbatch(),
+    emqx_ds_builtin_local:db_opts(),
+    emqx_ds:message_store_opts()
+) ->
+    emqx_ds:store_batch_result().
+do_store_batch_atomic(ShardId, #dsbatch{} = Batch, DBOpts, StoreOpts) ->
+    #dsbatch{
+        operations = Operations0,
+        preconditions = Preconditions
+    } = Batch,
+    case emqx_ds_precondition:verify(emqx_ds_storage_layer, ShardId, Preconditions) of
+        ok ->
+            do_store_operations(ShardId, Operations0, DBOpts, StoreOpts);
+        {precondition_failed, _} = PreconditionFailed ->
+            {error, unrecoverable, PreconditionFailed};
+        Error ->
+            Error
+    end;
+do_store_batch_atomic(ShardId, Operations, DBOpts, StoreOpts) ->
+    do_store_operations(ShardId, Operations, DBOpts, StoreOpts).
+
+do_store_operations(ShardId, Operations0, DBOpts, _StoreOpts) ->
+    ForceMonotonic = maps:get(force_monotonic_timestamps, DBOpts),
+    {Latest, Operations} =
+        emqx_ds_builtin_local:make_batch(
+            ForceMonotonic,
+            current_timestamp(ShardId),
+            Operations0
+        ),
+    Result = emqx_ds_storage_layer:store_batch(ShardId, Operations, _Options = #{}),
+    emqx_ds_builtin_local_meta:set_current_timestamp(ShardId, Latest),
+    Result.
+
+current_timestamp(ShardId) ->
+    emqx_ds_builtin_local_meta:current_timestamp(ShardId).

+ 11 - 1
apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local_db_sup.erl

@@ -158,7 +158,8 @@ init({#?shard_sup{db = DB, shard = Shard}, _}) ->
     Opts = emqx_ds_builtin_local_meta:db_config(DB),
     Opts = emqx_ds_builtin_local_meta:db_config(DB),
     Children = [
     Children = [
         shard_storage_spec(DB, Shard, Opts),
         shard_storage_spec(DB, Shard, Opts),
-        shard_buffer_spec(DB, Shard, Opts)
+        shard_buffer_spec(DB, Shard, Opts),
+        shard_batch_serializer_spec(DB, Shard, Opts)
     ],
     ],
     {ok, {SupFlags, Children}}.
     {ok, {SupFlags, Children}}.
 
 
@@ -208,6 +209,15 @@ shard_buffer_spec(DB, Shard, Options) ->
         type => worker
         type => worker
     }.
     }.
 
 
+shard_batch_serializer_spec(DB, Shard, Opts) ->
+    #{
+        id => {Shard, batch_serializer},
+        start => {emqx_ds_builtin_local_batch_serializer, start_link, [DB, Shard, Opts]},
+        shutdown => 5_000,
+        restart => permanent,
+        type => worker
+    }.
+
 ensure_started(Res) ->
 ensure_started(Res) ->
     case Res of
     case Res of
         {ok, _Pid} ->
         {ok, _Pid} ->

+ 4 - 1
apps/emqx_durable_storage/src/emqx_ds.erl

@@ -55,6 +55,7 @@
     topic_filter/0,
     topic_filter/0,
     topic/0,
     topic/0,
     batch/0,
     batch/0,
+    dsbatch/0,
     operation/0,
     operation/0,
     deletion/0,
     deletion/0,
     precondition/0,
     precondition/0,
@@ -104,7 +105,9 @@
 -type message_matcher(Payload) :: #message_matcher{payload :: Payload}.
 -type message_matcher(Payload) :: #message_matcher{payload :: Payload}.
 
 
 %% A batch of storage operations.
 %% A batch of storage operations.
--type batch() :: [operation()] | #dsbatch{}.
+-type batch() :: [operation()] | dsbatch().
+
+-type dsbatch() :: #dsbatch{}.
 
 
 -type operation() ::
 -type operation() ::
     %% Store a message.
     %% Store a message.