|
|
@@ -21,7 +21,7 @@
|
|
|
%% used for testing.
|
|
|
-module(emqx_ds_storage_bitfield_lts).
|
|
|
|
|
|
--behavior(emqx_ds_storage_layer).
|
|
|
+-behaviour(emqx_ds_storage_layer).
|
|
|
|
|
|
%% API:
|
|
|
-export([]).
|
|
|
@@ -65,6 +65,8 @@
|
|
|
keymappers :: array:array(emqx_ds_bitmask_keymapper:keymapper())
|
|
|
}).
|
|
|
|
|
|
+-type s() :: #s{}.
|
|
|
+
|
|
|
-record(stream, {
|
|
|
storage_key :: emqx_ds_lts:msg_storage_key()
|
|
|
}).
|
|
|
@@ -76,9 +78,7 @@
|
|
|
last_seen_key = <<>> :: binary()
|
|
|
}).
|
|
|
|
|
|
--define(QUICKCHECK_KEY(KEY, BITMASK, BITFILTER),
|
|
|
- ((KEY band BITMASK) =:= BITFILTER)
|
|
|
-).
|
|
|
+-type iterator() :: #it{}.
|
|
|
|
|
|
-define(COUNTER, emqx_ds_storage_bitfield_lts_counter).
|
|
|
|
|
|
@@ -92,6 +92,13 @@
|
|
|
%% behavior callbacks
|
|
|
%%================================================================================
|
|
|
|
|
|
+-spec create(
|
|
|
+ emqx_ds_replication_layer:shard_id(),
|
|
|
+ rocksdb:db_handle(),
|
|
|
+ emqx_ds_storage_layer:gen_id(),
|
|
|
+ options()
|
|
|
+) ->
|
|
|
+ {schema(), emqx_ds_storage_layer:cf_refs()}.
|
|
|
create(_ShardId, DBHandle, GenId, Options) ->
|
|
|
%% Get options:
|
|
|
BitsPerTopicLevel = maps:get(bits_per_wildcard_level, Options, 64),
|
|
|
@@ -112,6 +119,14 @@ create(_ShardId, DBHandle, GenId, Options) ->
|
|
|
},
|
|
|
{Schema, [{DataCFName, DataCFHandle}, {TrieCFName, TrieCFHandle}]}.
|
|
|
|
|
|
+-spec open(
|
|
|
+ emqx_ds_replication_layer:shard_id(),
|
|
|
+ rocksdb:db_handle(),
|
|
|
+ emqx_ds_storage_layer:gen_id(),
|
|
|
+ emqx_ds_storage_layer:cf_refs(),
|
|
|
+ schema()
|
|
|
+) ->
|
|
|
+ s().
|
|
|
open(_Shard, DBHandle, GenId, CFRefs, Schema) ->
|
|
|
#{
|
|
|
bits_per_wildcard_level := BitsPerTopicLevel,
|
|
|
@@ -134,6 +149,10 @@ open(_Shard, DBHandle, GenId, CFRefs, Schema) ->
|
|
|
),
|
|
|
#s{db = DBHandle, data = DataCF, trie = Trie, keymappers = KeymapperCache}.
|
|
|
|
|
|
+-spec store_batch(
|
|
|
+ emqx_ds_replication_layer:shard_id(), s(), [emqx_types:message()], emqx_ds:message_store_opts()
|
|
|
+) ->
|
|
|
+ emqx_ds:store_batch_result().
|
|
|
store_batch(_ShardId, S = #s{db = DB, data = Data}, Messages, _Options) ->
|
|
|
lists:foreach(
|
|
|
fun(Msg) ->
|
|
|
@@ -203,7 +222,7 @@ next(_Shard, #s{db = DB, data = CF, keymappers = Keymappers}, It0, BatchSize) ->
|
|
|
%% Internal functions
|
|
|
%%================================================================================
|
|
|
|
|
|
-next_loop(ITHandle, KeyMapper, Filter, It, Acc, 0) ->
|
|
|
+next_loop(_ITHandle, _KeyMapper, _Filter, It, Acc, 0) ->
|
|
|
{ok, It, lists:reverse(Acc)};
|
|
|
next_loop(ITHandle, KeyMapper, Filter, It0, Acc0, N0) ->
|
|
|
inc_counter(),
|
|
|
@@ -249,8 +268,8 @@ traverse_interval(ITHandle, KeyMapper, Filter, It0, Acc, N) ->
|
|
|
{0, It0, Acc}
|
|
|
end.
|
|
|
|
|
|
--spec check_message(emqx_ds_bitmask_keymapper:filter(), #it{}, binary()) ->
|
|
|
- {true, #message{}} | false.
|
|
|
+-spec check_message(emqx_ds_bitmask_keymapper:filter(), iterator(), binary()) ->
|
|
|
+ {true, emqx_types:message()} | false.
|
|
|
check_message(Filter, #it{last_seen_key = Key}, Val) ->
|
|
|
case emqx_ds_bitmask_keymapper:bin_checkmask(Filter, Key) of
|
|
|
true ->
|
|
|
@@ -270,7 +289,7 @@ format_keyfilter(any) ->
|
|
|
format_keyfilter({Op, Val}) ->
|
|
|
{Op, integer_to_list(Val, 16)}.
|
|
|
|
|
|
--spec make_key(#s{}, #message{}) -> {binary(), [binary()]}.
|
|
|
+-spec make_key(s(), emqx_types:message()) -> {binary(), [binary()]}.
|
|
|
make_key(#s{keymappers = KeyMappers, trie = Trie}, #message{timestamp = Timestamp, topic = TopicBin}) ->
|
|
|
Tokens = emqx_topic:tokens(TopicBin),
|
|
|
{TopicIndex, Varying} = emqx_ds_lts:topic_key(Trie, fun threshold_fun/1, Tokens),
|
|
|
@@ -345,7 +364,7 @@ read_persisted_trie(IT, {ok, KeyB, ValB}) ->
|
|
|
{binary_to_term(KeyB), binary_to_term(ValB)}
|
|
|
| read_persisted_trie(IT, rocksdb:iterator_move(IT, next))
|
|
|
];
|
|
|
-read_persisted_trie(IT, {error, invalid_iterator}) ->
|
|
|
+read_persisted_trie(_IT, {error, invalid_iterator}) ->
|
|
|
[].
|
|
|
|
|
|
inc_counter() ->
|