Explorar o código

feat(ds): support deletions + precondition-related API in bitfield-lts

Andrew Mayorov hai 1 ano
pai
achega
5502af18b7

+ 48 - 19
apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl

@@ -37,6 +37,7 @@
     update_iterator/4,
     next/6,
     delete_next/7,
+    lookup_message/3,
 
     handle_event/4
 ]).
@@ -46,6 +47,7 @@
 
 -export_type([options/0]).
 
+-include("emqx_ds.hrl").
 -include("emqx_ds_metrics.hrl").
 -include_lib("emqx_utils/include/emqx_message.hrl").
 -include_lib("snabbkaffe/include/trace.hrl").
@@ -68,10 +70,13 @@
 -define(start_time, 3).
 -define(storage_key, 4).
 -define(last_seen_key, 5).
--define(cooked_payloads, 6).
+-define(cooked_msg_ops, 6).
 -define(cooked_lts_ops, 7).
 -define(cooked_ts, 8).
 
+%% atoms:
+-define(delete, 100).
+
 -type options() ::
     #{
         bits_per_wildcard_level => pos_integer(),
@@ -110,7 +115,7 @@
 
 -type cooked_batch() ::
     #{
-        ?cooked_payloads := [{binary(), binary()}],
+        ?cooked_msg_ops := [{binary(), binary() | ?delete}],
         ?cooked_lts_ops := [{binary(), binary()}],
         ?cooked_ts := integer()
     }.
@@ -271,24 +276,28 @@ drop(_Shard, DBHandle, GenId, CFRefs, #s{trie = Trie, gvars = GVars}) ->
 -spec prepare_batch(
     emqx_ds_storage_layer:shard_id(),
     s(),
-    [{emqx_ds:time(), emqx_types:message()}, ...],
+    emqx_ds_storage_layer:batch(),
     emqx_ds_storage_layer:batch_store_opts()
 ) ->
     {ok, cooked_batch()}.
-prepare_batch(_ShardId, S, Messages, _Options) ->
+prepare_batch(_ShardId, S, Batch, _Options) ->
     _ = erase(?lts_persist_ops),
-    {Payloads, MaxTs} =
+    {Operations, MaxTs} =
         lists:mapfoldl(
-            fun({Timestamp, Msg}, Acc) ->
-                {Key, _} = make_key(S, Timestamp, Msg),
-                Payload = {Key, message_to_value_v1(Msg)},
-                {Payload, max(Acc, Timestamp)}
+            fun
+                ({Timestamp, Msg = #message{topic = Topic}}, Acc) ->
+                    {Key, _} = make_key(S, Timestamp, Topic),
+                    Op = {Key, message_to_value_v1(Msg)},
+                    {Op, max(Acc, Timestamp)};
+                ({delete, #message_matcher{topic = Topic, timestamp = Timestamp}}, Acc) ->
+                    {Key, _} = make_key(S, Timestamp, Topic),
+                    {_Op = {Key, ?delete}, Acc}
             end,
             0,
-            Messages
+            Batch
         ),
     {ok, #{
-        ?cooked_payloads => Payloads,
+        ?cooked_msg_ops => Operations,
         ?cooked_lts_ops => pop_lts_persist_ops(),
         ?cooked_ts => MaxTs
     }}.
@@ -302,7 +311,7 @@ prepare_batch(_ShardId, S, Messages, _Options) ->
 commit_batch(
     _ShardId,
     _Data,
-    #{?cooked_payloads := [], ?cooked_lts_ops := LTS},
+    #{?cooked_msg_ops := [], ?cooked_lts_ops := LTS},
     _Options
 ) ->
     %% Assert:
@@ -311,7 +320,7 @@ commit_batch(
 commit_batch(
     _ShardId,
     #s{db = DB, data = DataCF, trie = Trie, trie_cf = TrieCF, gvars = Gvars},
-    #{?cooked_lts_ops := LtsOps, ?cooked_payloads := Payloads, ?cooked_ts := MaxTs},
+    #{?cooked_lts_ops := LtsOps, ?cooked_msg_ops := Operations, ?cooked_ts := MaxTs},
     Options
 ) ->
     {ok, Batch} = rocksdb:batch(),
@@ -326,10 +335,13 @@ commit_batch(
     _ = emqx_ds_lts:trie_update(Trie, LtsOps),
     %% Commit payloads:
     lists:foreach(
-        fun({Key, Val}) ->
-            ok = rocksdb:batch_put(Batch, DataCF, Key, term_to_binary(Val))
+        fun
+            ({Key, Val}) when is_tuple(Val) ->
+                ok = rocksdb:batch_put(Batch, DataCF, Key, term_to_binary(Val));
+            ({Key, ?delete}) ->
+                ok = rocksdb:batch_delete(Batch, DataCF, Key)
         end,
-        Payloads
+        Operations
     ),
     Result = rocksdb:write_batch(DB, Batch, write_batch_opts(Options)),
     rocksdb:release_batch(Batch),
@@ -556,6 +568,23 @@ delete_next_until(
         rocksdb:iterator_close(ITHandle)
     end.
 
+-spec lookup_message(emqx_ds_storage_layer:shard_id(), s(), emqx_ds_precondition:matcher()) ->
+    emqx_types:message() | not_found | emqx_ds:error(_).
+lookup_message(
+    _ShardId,
+    S = #s{db = DB, data = CF},
+    #message_matcher{topic = Topic, timestamp = Timestamp}
+) ->
+    {Key, _} = make_key(S, Timestamp, Topic),
+    case rocksdb:get(DB, CF, Key, _ReadOpts = []) of
+        {ok, Blob} ->
+            deserialize(Blob);
+        not_found ->
+            not_found;
+        Error ->
+            {error, unrecoverable, {rocksdb, Error}}
+    end.
+
 handle_event(_ShardId, State = #s{gvars = Gvars}, Time, tick) ->
     %% If the last message was published more than one epoch ago, and
     %% the shard remains idle, we need to advance safety cutoff
@@ -811,9 +840,9 @@ format_key(KeyMapper, Key) ->
     Vec = [integer_to_list(I, 16) || I <- emqx_ds_bitmask_keymapper:key_to_vector(KeyMapper, Key)],
     lists:flatten(io_lib:format("~.16B (~s)", [Key, string:join(Vec, ",")])).
 
--spec make_key(s(), emqx_ds:time(), emqx_types:message()) -> {binary(), [binary()]}.
-make_key(#s{keymappers = KeyMappers, trie = Trie}, Timestamp, #message{topic = TopicBin}) ->
-    Tokens = emqx_topic:words(TopicBin),
+-spec make_key(s(), emqx_ds:time(), emqx_types:topic()) -> {binary(), [binary()]}.
+make_key(#s{keymappers = KeyMappers, trie = Trie}, Timestamp, Topic) ->
+    Tokens = emqx_topic:words(Topic),
     {TopicIndex, Varying} = emqx_ds_lts:topic_key(Trie, fun threshold_fun/1, Tokens),
     VaryingHashes = [hash_topic_level(I) || I <- Varying],
     KeyMapper = array:get(length(Varying), KeyMappers),

+ 1 - 0
apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl

@@ -64,6 +64,7 @@
 -export_type([
     gen_id/0,
     generation/0,
+    batch/0,
     cf_refs/0,
     stream/0,
     delete_stream/0,