Kaynağa Gözat

feat(ds): add atomic store API

Part of https://emqx.atlassian.net/browse/EMQX-11841
Thales Macedo Garitezi 1 yıl önce
ebeveyn
işleme
5d87d400f4

+ 10 - 1
apps/emqx_durable_storage/src/emqx_ds.erl

@@ -150,7 +150,16 @@
 
 -type message_store_opts() ::
     #{
-        sync => boolean()
+        %% Whether to wait until the message storage has been acknowledged to return from
+        %% `store_batch'.
+        %% Default: `true'.
+        sync => boolean(),
+        %% Whether the whole batch given to `store_batch' should be inserted atomically as
+        %% a unit.  Note: the whole batch must be crafted so that it belongs to a single
+        %% shard (if applicable to the backend), as the batch will be split accordingly
+        %% even if this flag is `true'.
+        %% Default: `false'.
+        atomic => boolean()
     }.
 
 -type generic_db_opts() ::

+ 41 - 10
apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl

@@ -51,6 +51,7 @@
 -define(flush, flush).
 
 -record(enqueue_req, {message :: emqx_types:message(), sync :: boolean()}).
+-record(enqueue_atomic_req, {batch :: [emqx_types:message()], sync :: boolean()}).
 
 %%================================================================================
 %% API functions
@@ -64,13 +65,34 @@ start_link(DB, Shard) ->
     ok.
 store_batch(DB, Messages, Opts) ->
     Sync = maps:get(sync, Opts, true),
-    lists:foreach(
-        fun(Message) ->
-            Shard = emqx_ds_replication_layer:shard_of_message(DB, Message, clientid),
-            gen_server:call(?via(DB, Shard), #enqueue_req{message = Message, sync = Sync})
-        end,
-        Messages
-    ).
+    case maps:get(atomic, Opts, false) of
+        false ->
+            lists:foreach(
+                fun(Message) ->
+                    Shard = emqx_ds_replication_layer:shard_of_message(DB, Message, clientid),
+                    gen_server:call(?via(DB, Shard), #enqueue_req{
+                        message = Message,
+                        sync = Sync
+                    })
+                end,
+                Messages
+            );
+        true ->
+            maps:foreach(
+                fun(Shard, Batch) ->
+                    gen_server:call(?via(DB, Shard), #enqueue_atomic_req{
+                        batch = Batch,
+                        sync = Sync
+                    })
+                end,
+                maps:groups_from_list(
+                    fun(Message) ->
+                        emqx_ds_replication_layer:shard_of_message(DB, Message, clientid)
+                    end,
+                    Messages
+                )
+            )
+    end.
 
 %%================================================================================
 %% behavior callbacks
@@ -101,6 +123,9 @@ init([DB, Shard]) ->
 
 handle_call(#enqueue_req{message = Msg, sync = Sync}, From, S) ->
     do_enqueue(From, Sync, Msg, S);
+handle_call(#enqueue_atomic_req{batch = Batch, sync = Sync}, From, S) ->
+    Len = length(Batch),
+    do_enqueue(From, Sync, {atomic, Len, Batch}, S);
 handle_call(_Call, _From, S) ->
     {reply, {error, unknown_call}, S}.
 
@@ -131,7 +156,7 @@ do_flush(
     Batch = #{?tag => ?BATCH, ?batch_messages => lists:reverse(Messages)},
     ok = emqx_ds_proto_v2:store_batch(Leader, DB, Shard, Batch, #{}),
     [gen_server:reply(From, ok) || From <- lists:reverse(Replies)],
-    ?tp(emqx_ds_replication_layer_egress_flush, #{db => DB, shard => Shard}),
+    ?tp(emqx_ds_replication_layer_egress_flush, #{db => DB, shard => Shard, batch => Messages}),
     erlang:garbage_collect(),
     S#s{
         n = 0,
@@ -140,9 +165,15 @@ do_flush(
         tref = start_timer()
     }.
 
-do_enqueue(From, Sync, Msg, S0 = #s{n = N, batch = Batch, pending_replies = Replies}) ->
+do_enqueue(From, Sync, MsgOrBatch, S0 = #s{n = N, batch = Batch, pending_replies = Replies}) ->
     NMax = application:get_env(emqx_durable_storage, egress_batch_size, 1000),
-    S1 = S0#s{n = N + 1, batch = [Msg | Batch]},
+    S1 =
+        case MsgOrBatch of
+            {atomic, NumMsgs, Msgs} ->
+                S0#s{n = N + NumMsgs, batch = Msgs ++ Batch};
+            Msg ->
+                S0#s{n = N + 1, batch = [Msg | Batch]}
+        end,
     S2 =
         case N >= NMax of
             true ->

+ 13 - 0
apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl

@@ -230,6 +230,19 @@ drop(_Shard, DBHandle, GenId, CFRefs, #s{}) ->
     emqx_ds_storage_layer:shard_id(), s(), [emqx_types:message()], emqx_ds:message_store_opts()
 ) ->
     emqx_ds:store_batch_result().
+store_batch(_ShardId, S = #s{db = DB, data = Data}, Messages, _Options = #{atomic := true}) ->
+    {ok, Batch} = rocksdb:batch(),
+    lists:foreach(
+        fun(Msg) ->
+            {Key, _} = make_key(S, Msg),
+            Val = serialize(Msg),
+            rocksdb:batch_put(Batch, Data, Key, Val)
+        end,
+        Messages
+    ),
+    Res = rocksdb:write_batch(DB, Batch, _WriteOptions = []),
+    rocksdb:release_batch(Batch),
+    Res;
 store_batch(_ShardId, S = #s{db = DB, data = Data}, Messages, _Options) ->
     lists:foreach(
         fun(Msg) ->

+ 14 - 0
apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl

@@ -90,6 +90,20 @@ drop(_ShardId, DBHandle, _GenId, _CFRefs, #s{cf = CFHandle}) ->
     ok = rocksdb:drop_column_family(DBHandle, CFHandle),
     ok.
 
+store_batch(_ShardId, #s{db = DB, cf = CF}, Messages, _Options = #{atomic := true}) ->
+    {ok, Batch} = rocksdb:batch(),
+    lists:foreach(
+        fun(Msg) ->
+            Id = erlang:unique_integer([monotonic]),
+            Key = <<Id:64>>,
+            Val = term_to_binary(Msg),
+            rocksdb:batch_put(Batch, CF, Key, Val)
+        end,
+        Messages
+    ),
+    Res = rocksdb:write_batch(DB, Batch, _WriteOptions = []),
+    rocksdb:release_batch(Batch),
+    Res;
 store_batch(_ShardId, #s{db = DB, cf = CF}, Messages, _Options) ->
     lists:foreach(
         fun(Msg) ->

+ 66 - 0
apps/emqx_durable_storage/test/emqx_ds_SUITE.erl

@@ -307,6 +307,71 @@ t_08_smoke_list_drop_generation(_Config) ->
     ),
     ok.
 
+t_09_atomic_store_batch(_Config) ->
+    DB = ?FUNCTION_NAME,
+    ?check_trace(
+        begin
+            application:set_env(emqx_durable_storage, egress_batch_size, 1),
+            ?assertMatch(ok, emqx_ds:open_db(DB, opts())),
+            Msgs = [
+                message(<<"1">>, <<"1">>, 0),
+                message(<<"2">>, <<"2">>, 1),
+                message(<<"3">>, <<"3">>, 2)
+            ],
+            ?assertEqual(
+                ok,
+                emqx_ds:store_batch(DB, Msgs, #{
+                    atomic => true,
+                    sync => true
+                })
+            ),
+
+            ok
+        end,
+        fun(Trace) ->
+            %% Must contain exactly one flush with all messages.
+            ?assertMatch(
+                [#{batch := [_, _, _]}],
+                ?of_kind(emqx_ds_replication_layer_egress_flush, Trace)
+            ),
+            ok
+        end
+    ),
+    ok.
+
+t_10_non_atomic_store_batch(_Config) ->
+    DB = ?FUNCTION_NAME,
+    ?check_trace(
+        begin
+            application:set_env(emqx_durable_storage, egress_batch_size, 1),
+            ?assertMatch(ok, emqx_ds:open_db(DB, opts())),
+            Msgs = [
+                message(<<"1">>, <<"1">>, 0),
+                message(<<"2">>, <<"2">>, 1),
+                message(<<"3">>, <<"3">>, 2)
+            ],
+            %% Non-atomic batches may be split.
+            ?assertEqual(
+                ok,
+                emqx_ds:store_batch(DB, Msgs, #{
+                    atomic => false,
+                    sync => true
+                })
+            ),
+
+            ok
+        end,
+        fun(Trace) ->
+            %% Should contain one flush per message.
+            ?assertMatch(
+                [#{batch := [_]}, #{batch := [_]}, #{batch := [_]}],
+                ?of_kind(emqx_ds_replication_layer_egress_flush, Trace)
+            ),
+            ok
+        end
+    ),
+    ok.
+
 t_drop_generation_with_never_used_iterator(_Config) ->
     %% This test checks how the iterator behaves when:
     %%   1) it's created at generation 1 and not consumed from.
@@ -549,6 +614,7 @@ iterate(DB, It0, BatchSize, Acc) ->
 all() -> emqx_common_test_helpers:all(?MODULE).
 
 init_per_suite(Config) ->
+    emqx_common_test_helpers:clear_screen(),
     Apps = emqx_cth_suite:start(
         [mria, emqx_durable_storage],
         #{work_dir => ?config(priv_dir, Config)}

+ 64 - 0
apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl

@@ -219,6 +219,69 @@ t_replay(_Config) ->
     ?assert(check(?SHARD, <<"#">>, 0, Messages)),
     ok.
 
+t_atomic_store_batch(_Config) ->
+    DB = ?FUNCTION_NAME,
+    ?check_trace(
+        begin
+            application:set_env(emqx_durable_storage, egress_batch_size, 1),
+            Msgs = [
+                make_message(0, <<"1">>, <<"1">>),
+                make_message(1, <<"2">>, <<"2">>),
+                make_message(2, <<"3">>, <<"3">>)
+            ],
+            ?assertEqual(
+                ok,
+                emqx_ds:store_batch(DB, Msgs, #{
+                    atomic => true,
+                    sync => true
+                })
+            ),
+
+            ok
+        end,
+        fun(Trace) ->
+            %% Must contain exactly one flush with all messages.
+            ?assertMatch(
+                [#{batch := [_, _, _]}],
+                ?of_kind(emqx_ds_replication_layer_egress_flush, Trace)
+            ),
+            ok
+        end
+    ),
+    ok.
+
+t_non_atomic_store_batch(_Config) ->
+    DB = ?FUNCTION_NAME,
+    ?check_trace(
+        begin
+            application:set_env(emqx_durable_storage, egress_batch_size, 1),
+            Msgs = [
+                make_message(0, <<"1">>, <<"1">>),
+                make_message(1, <<"2">>, <<"2">>),
+                make_message(2, <<"3">>, <<"3">>)
+            ],
+            %% Non-atomic batches may be split.
+            ?assertEqual(
+                ok,
+                emqx_ds:store_batch(DB, Msgs, #{
+                    atomic => false,
+                    sync => true
+                })
+            ),
+
+            ok
+        end,
+        fun(Trace) ->
+            %% Should contain one flush per message.
+            ?assertMatch(
+                [#{batch := [_]}, #{batch := [_]}, #{batch := [_]}],
+                ?of_kind(emqx_ds_replication_layer_egress_flush, Trace)
+            ),
+            ok
+        end
+    ),
+    ok.
+
 check(Shard, TopicFilter, StartTime, ExpectedMessages) ->
     ExpectedFiltered = lists:filter(
         fun(#message{topic = Topic, timestamp = TS}) ->
@@ -418,6 +481,7 @@ all() -> emqx_common_test_helpers:all(?MODULE).
 suite() -> [{timetrap, {seconds, 20}}].
 
 init_per_suite(Config) ->
+    emqx_common_test_helpers:clear_screen(),
     Apps = emqx_cth_suite:start(
         [emqx_durable_storage],
         #{work_dir => emqx_cth_suite:work_dir(Config)}