Explorar o código

feat(ds): two-stage storage commit on the storage level

ieQu1 hai 1 ano
pai
achega
2236af84ba

+ 1 - 2
apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl

@@ -733,7 +733,7 @@ apply(
     Result = emqx_ds_storage_layer:drop_generation(DBShard, GenId),
     {State, Result};
 apply(
-    #{index := RaftIdx},
+    _RaftMeta,
     #{?tag := storage_event, ?payload := CustomEvent},
     #{db_shard := DBShard, latest := Latest0} = State
 ) ->
@@ -754,7 +754,6 @@ tick(TimeMs, #{db_shard := DBShard = {DB, Shard}, latest := Latest}) ->
     %% Leader = emqx_ds_replication_layer_shard:lookup_leader(DB, Shard),
     {Timestamp, _} = assign_timestamp(timestamp_to_timeus(TimeMs), Latest),
     ?tp(emqx_ds_replication_layer_tick, #{db => DB, shard => Shard, ts => Timestamp}),
-    set_ts(DBShard, Latest),
     handle_custom_event(DBShard, Timestamp, tick).
 
 assign_timestamps(Latest, Messages) ->

+ 91 - 16
apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl

@@ -28,7 +28,8 @@
     create/4,
     open/5,
     drop/5,
-    store_batch/4,
+    prepare_batch/4,
+    commit_batch/3,
     get_streams/4,
     get_delete_streams/4,
     make_iterator/5,
@@ -68,6 +69,9 @@
 -define(start_time, 3).
 -define(storage_key, 4).
 -define(last_seen_key, 5).
+-define(cooked_payloads, 6).
+-define(cooked_lts_ops, 7).
+-define(cooked_ts, 8).
 
 -type options() ::
     #{
@@ -90,18 +94,28 @@
     db :: rocksdb:db_handle(),
     data :: rocksdb:cf_handle(),
     trie :: emqx_ds_lts:trie(),
+    trie_cf :: rocksdb:cf_handle(),
     keymappers :: array:array(emqx_ds_bitmask_keymapper:keymapper()),
     ts_bits :: non_neg_integer(),
     ts_offset :: non_neg_integer(),
     gvars :: ets:table()
 }).
 
+-define(lts_persist_ops, emqx_ds_storage_bitfield_lts_ops).
+
 -type s() :: #s{}.
 
 -type stream() :: emqx_ds_lts:msg_storage_key().
 
 -type delete_stream() :: emqx_ds_lts:msg_storage_key().
 
+-type cooked_batch() ::
+    #{
+        ?cooked_payloads := [{binary(), binary()}],
+        ?cooked_lts_ops := [{binary(), binary()}],
+        ?cooked_ts := integer()
+    }.
+
 -type iterator() ::
     #{
         ?tag := ?IT,
@@ -220,6 +234,7 @@ open(_Shard, DBHandle, GenId, CFRefs, Schema) ->
         db = DBHandle,
         data = DataCF,
         trie = Trie,
+        trie_cf = TrieCF,
         keymappers = KeymapperCache,
         ts_offset = TSOffsetBits,
         ts_bits = TSBits,
@@ -257,24 +272,65 @@ drop(_Shard, DBHandle, GenId, CFRefs, #s{trie = Trie, gvars = GVars}) ->
     ok = rocksdb:drop_column_family(DBHandle, TrieCF),
     ok.
 
--spec store_batch(
+-spec prepare_batch(
     emqx_ds_storage_layer:shard_id(),
     s(),
     [{emqx_ds:time(), emqx_types:message()}],
     emqx_ds:message_store_opts()
 ) ->
-    emqx_ds:store_batch_result().
-store_batch(_ShardId, S = #s{db = DB, data = Data, gvars = Gvars}, Messages, _Options) ->
+    {ok, cooked_batch()}.
+prepare_batch(_ShardId, S, Messages, _Options) ->
+    _ = erase(?lts_persist_ops),
+    {Payloads, MaxTs} =
+        lists:mapfoldl(
+            fun({Timestamp, Msg}, Acc) ->
+                {Key, _} = make_key(S, Timestamp, Msg),
+                Payload = {Key, message_to_value_v1(Msg)},
+                {Payload, max(Acc, Timestamp)}
+            end,
+            0,
+            Messages
+        ),
+    {ok, #{
+        ?cooked_payloads => Payloads,
+        ?cooked_lts_ops => pop_lts_persist_ops(),
+        ?cooked_ts => MaxTs
+    }}.
+
+-spec commit_batch(
+    emqx_ds_storage_layer:shard_id(),
+    s(),
+    cooked_batch()
+) -> ok.
+commit_batch(
+    _ShardId,
+    _Data,
+    #{?cooked_payloads := [], ?cooked_lts_ops := LTS}
+) ->
+    %% Assert:
+    [] = LTS,
+    ok;
+commit_batch(
+    _ShardId,
+    #s{db = DB, data = DataCF, trie = Trie, trie_cf = TrieCF, gvars = Gvars},
+    #{?cooked_lts_ops := LtsOps, ?cooked_payloads := Payloads, ?cooked_ts := MaxTs}
+) ->
     {ok, Batch} = rocksdb:batch(),
-    MaxTs = lists:foldl(
-        fun({Timestamp, Msg}, Acc) ->
-            {Key, _} = make_key(S, Timestamp, Msg),
-            Val = serialize(Msg),
-            ok = rocksdb:put(DB, Data, Key, Val, []),
-            max(Acc, Timestamp)
+    %% Commit LTS trie to the storage:
+    lists:foreach(
+        fun({Key, Val}) ->
+            ok = rocksdb:batch_put(Batch, TrieCF, term_to_binary(Key), term_to_binary(Val))
+        end,
+        LtsOps
+    ),
+    %% Apply LTS ops to the memory cache:
+    _ = emqx_ds_lts:trie_update(Trie, LtsOps),
+    %% Commit payloads:
+    lists:foreach(
+        fun({Key, Val}) ->
+            ok = rocksdb:batch_put(Batch, DataCF, Key, term_to_binary(Val))
         end,
-        0,
-        Messages
+        Payloads
     ),
     Result = rocksdb:write_batch(DB, Batch, []),
     rocksdb:release_batch(Batch),
@@ -780,9 +836,6 @@ value_v1_to_message({Id, Qos, From, Flags, Headers, Topic, Payload, Timestamp, E
         extra = Extra
     }.
 
-serialize(Msg) ->
-    term_to_binary(message_to_value_v1(Msg)).
-
 deserialize(Blob) ->
     value_v1_to_message(binary_to_term(Blob)).
 
@@ -810,7 +863,8 @@ make_keymapper(TopicIndexBytes, BitsPerTopicLevel, TSBits, TSOffsetBits, N) ->
 -spec restore_trie(pos_integer(), rocksdb:db_handle(), rocksdb:cf_handle()) -> emqx_ds_lts:trie().
 restore_trie(TopicIndexBytes, DB, CF) ->
     PersistCallback = fun(Key, Val) ->
-        rocksdb:put(DB, CF, term_to_binary(Key), term_to_binary(Val), [])
+        push_lts_persist_op(Key, Val),
+        ok
     end,
     {ok, IT} = rocksdb:iterator(DB, CF, []),
     try
@@ -858,8 +912,29 @@ data_cf(GenId) ->
 trie_cf(GenId) ->
     "emqx_ds_storage_bitfield_lts_trie" ++ integer_to_list(GenId).
 
+-spec push_lts_persist_op(_Key, _Val) -> ok.
+push_lts_persist_op(Key, Val) ->
+    case erlang:get(?lts_persist_ops) of
+        undefined ->
+            erlang:put(?lts_persist_ops, [{Key, Val}]);
+        L when is_list(L) ->
+            erlang:put(?lts_persist_ops, [{Key, Val} | L])
+    end.
+
+-spec pop_lts_persist_ops() -> [{_Key, _Val}].
+pop_lts_persist_ops() ->
+    case erlang:erase(?lts_persist_ops) of
+        undefined ->
+            [];
+        L when is_list(L) ->
+            L
+    end.
+
 -ifdef(TEST).
 
+serialize(Msg) ->
+    term_to_binary(message_to_value_v1(Msg)).
+
 serialize_deserialize_test() ->
     Msg = #message{
         id = <<"message_id_val">>,

+ 65 - 13
apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl

@@ -26,6 +26,9 @@
 
     %% Data
     store_batch/3,
+    prepare_batch/3,
+    commit_batch/2,
+
     get_streams/3,
     get_delete_streams/3,
     make_iterator/4,
@@ -66,7 +69,8 @@
     shard_id/0,
     options/0,
     prototype/0,
-    post_creation_context/0
+    post_creation_context/0,
+    cooked_batch/0
 ]).
 
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
@@ -86,6 +90,7 @@
 -define(STREAM, 1).
 -define(IT, 2).
 -define(DELETE_IT, 3).
+-define(COOKED_BATCH, 4).
 
 %% keys:
 -define(tag, 1).
@@ -132,6 +137,13 @@
         ?enc := term()
     }.
 
+-opaque cooked_batch() ::
+    #{
+        ?tag := ?COOKED_BATCH,
+        ?generation := gen_id(),
+        ?enc := term()
+    }.
+
 %%%% Generation:
 
 -define(GEN_KEY(GEN_ID), {generation, GEN_ID}).
@@ -207,13 +219,19 @@
 -callback drop(shard_id(), rocksdb:db_handle(), gen_id(), cf_refs(), _RuntimeData) ->
     ok | {error, _Reason}.
 
--callback store_batch(
+-callback prepare_batch(
     shard_id(),
     _Data,
-    [{emqx_ds:time(), emqx_types:message()}],
+    [{emqx_ds:time(), emqx_types:message()}, ...],
     emqx_ds:message_store_opts()
 ) ->
-    emqx_ds:store_batch_result().
+    {ok, term()} | {error, _}.
+
+-callback commit_batch(
+    shard_id(),
+    _Data,
+    _CookedBatch
+) -> ok.
 
 -callback get_streams(shard_id(), _Data, emqx_ds:topic_filter(), emqx_ds:time()) ->
     [_Stream].
@@ -261,20 +279,54 @@ drop_shard(Shard) ->
     emqx_ds:message_store_opts()
 ) ->
     emqx_ds:store_batch_result().
-store_batch(Shard, Messages = [{Time, _Msg} | _], Options) ->
+store_batch(Shard, Messages, Options) ->
+    ?tp(emqx_ds_storage_layer_store_batch, #{
+        shard => Shard, messages => Messages, options => Options
+    }),
+    case prepare_batch(Shard, Messages, Options) of
+        {ok, CookedBatch} ->
+            commit_batch(Shard, CookedBatch);
+        ignore ->
+            ok;
+        Error = {error, _} ->
+            Error
+    end.
+
+-spec prepare_batch(
+    shard_id(),
+    [{emqx_ds:time(), emqx_types:message()}],
+    emqx_ds:message_store_opts()
+) -> {ok, cooked_batch()} | ignore | {error, _}.
+prepare_batch(Shard, Messages = [{Time, _Msg} | _], Options) ->
     %% NOTE
     %% We assume that batches do not span generations. Callers should enforce this.
-    ?tp(emqx_ds_storage_layer_store_batch, #{
+    ?tp(emqx_ds_storage_layer_prepare_batch, #{
         shard => Shard, messages => Messages, options => Options
     }),
-    #{module := Mod, data := GenData} = generation_at(Shard, Time),
+    {GenId, #{module := Mod, data := GenData}} = generation_at(Shard, Time),
     T0 = erlang:monotonic_time(microsecond),
-    Result = Mod:store_batch(Shard, GenData, Messages, Options),
+    Result =
+        case Mod:prepare_batch(Shard, GenData, Messages, Options) of
+            {ok, CookedBatch} ->
+                {ok, #{?tag => ?COOKED_BATCH, ?generation => GenId, ?enc => CookedBatch}};
+            Error = {error, _} ->
+                Error
+        end,
     T1 = erlang:monotonic_time(microsecond),
+    %% TODO store->prepare
     emqx_ds_builtin_metrics:observe_store_batch_time(Shard, T1 - T0),
     Result;
-store_batch(_Shard, [], _Options) ->
-    ok.
+prepare_batch(_Shard, [], _Options) ->
+    ignore.
+
+-spec commit_batch(shard_id(), cooked_batch()) -> ok.
+commit_batch(Shard, #{?tag := ?COOKED_BATCH, ?generation := GenId, ?enc := CookedBatch}) ->
+    #{?GEN_KEY(GenId) := #{module := Mod, data := GenData}} = get_schema_runtime(Shard),
+    T0 = erlang:monotonic_time(microsecond),
+    Result = Mod:commit_batch(Shard, GenData, CookedBatch),
+    T1 = erlang:monotonic_time(microsecond),
+    emqx_ds_builtin_metrics:observe_store_batch_time(Shard, T1 - T0),
+    Result.
 
 -spec get_streams(shard_id(), emqx_ds:topic_filter(), emqx_ds:time()) ->
     [{integer(), stream()}].
@@ -878,7 +930,7 @@ handle_accept_snapshot(ShardId) ->
 %% The mechanism of storage layer events should be refined later.
 -spec handle_event(shard_id(), emqx_ds:time(), CustomEvent | tick) -> [CustomEvent].
 handle_event(Shard, Time, Event) ->
-    #{module := Mod, data := GenData} = generation_at(Shard, Time),
+    {_GenId, #{module := Mod, data := GenData}} = generation_at(Shard, Time),
     ?tp(emqx_ds_storage_layer_event, #{mod => Mod, time => Time, event => Event}),
     case erlang:function_exported(Mod, handle_event, 4) of
         true ->
@@ -919,7 +971,7 @@ generations_since(Shard, Since) ->
         Schema
     ).
 
--spec generation_at(shard_id(), emqx_ds:time()) -> generation().
+-spec generation_at(shard_id(), emqx_ds:time()) -> {gen_id(), generation()}.
 generation_at(Shard, Time) ->
     Schema = #{current_generation := Current} = get_schema_runtime(Shard),
     generation_at(Time, Current, Schema).
@@ -930,7 +982,7 @@ generation_at(Time, GenId, Schema) ->
         #{since := Since} when Time < Since andalso GenId > 0 ->
             generation_at(Time, prev_generation_id(GenId), Schema);
         _ ->
-            Gen
+            {GenId, Gen}
     end.
 
 -define(PERSISTENT_TERM(SHARD), {emqx_ds_storage_layer, SHARD}).

+ 9 - 15
apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl

@@ -31,7 +31,8 @@
     create/4,
     open/5,
     drop/5,
-    store_batch/4,
+    prepare_batch/4,
+    commit_batch/3,
     get_streams/4,
     get_delete_streams/4,
     make_iterator/5,
@@ -101,12 +102,14 @@ drop(_ShardId, DBHandle, _GenId, _CFRefs, #s{cf = CFHandle}) ->
     ok = rocksdb:drop_column_family(DBHandle, CFHandle),
     ok.
 
-store_batch(_ShardId, #s{db = DB, cf = CF}, Messages, _Options = #{atomic := true}) ->
+prepare_batch(_ShardId, _Data, Messages, _Options) ->
+    {ok, Messages}.
+
+commit_batch(_ShardId, #s{db = DB, cf = CF}, Messages) ->
     {ok, Batch} = rocksdb:batch(),
     lists:foreach(
-        fun(Msg) ->
-            Id = erlang:unique_integer([monotonic]),
-            Key = <<Id:64>>,
+        fun({TS, Msg}) ->
+            Key = <<TS:64>>,
             Val = term_to_binary(Msg),
             rocksdb:batch_put(Batch, CF, Key, Val)
         end,
@@ -114,16 +117,7 @@ store_batch(_ShardId, #s{db = DB, cf = CF}, Messages, _Options = #{atomic := tru
     ),
     Res = rocksdb:write_batch(DB, Batch, _WriteOptions = []),
     rocksdb:release_batch(Batch),
-    Res;
-store_batch(_ShardId, #s{db = DB, cf = CF}, Messages, _Options) ->
-    lists:foreach(
-        fun({Timestamp, Msg}) ->
-            Key = <<Timestamp:64>>,
-            Val = term_to_binary(Msg),
-            rocksdb:put(DB, CF, Key, Val, [])
-        end,
-        Messages
-    ).
+    Res.
 
 get_streams(_Shard, _Data, _TopicFilter, _StartTime) ->
     [#stream{}].

+ 1 - 0
apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl

@@ -159,6 +159,7 @@ t_rebalance('end', Config) ->
 t_rebalance(Config) ->
     NMsgs = 50,
     NClients = 5,
+    NEvents = NMsgs * NClients,
     %% List of fake client IDs:
     Clients = [integer_to_binary(I) || I <- lists:seq(1, NClients)],
     %% List of streams that generate messages for each "client" in its own topic: