|
|
@@ -105,7 +105,7 @@
|
|
|
hash/2
|
|
|
]).
|
|
|
|
|
|
--export_type([db/0, iterator/0]).
|
|
|
+-export_type([db/0, iterator/0, schema/0]).
|
|
|
|
|
|
-compile({inline, [ones/1, bitwise_concat/3]}).
|
|
|
|
|
|
@@ -131,9 +131,12 @@
|
|
|
-type bits_per_level() :: [bits(), ...].
|
|
|
|
|
|
-type options() :: #{
|
|
|
- %% Keymapper.
|
|
|
- keymapper := keymapper(),
|
|
|
- %% Name and options to use to open specific column family.
|
|
|
+ %% Number of bits in a message timestamp.
|
|
|
+ timestamp_bits := bits(),
|
|
|
+ %% Number of bits in a key allocated to each level in a message topic.
|
|
|
+ topic_bits_per_level := bits_per_level(),
|
|
|
+ %% Maximum granularity of iteration over time.
|
|
|
+ epoch := time(),
|
|
|
cf_options => emqx_replay_local_store:db_cf_options()
|
|
|
}.
|
|
|
|
|
|
@@ -151,7 +154,7 @@
|
|
|
%% record when the database is reopened
|
|
|
-record(schema, {keymapper :: keymapper()}).
|
|
|
|
|
|
--type schema() :: #schema{}.
|
|
|
+-opaque schema() :: #schema{}.
|
|
|
|
|
|
-record(db, {
|
|
|
handle :: rocksdb:db_handle(),
|
|
|
@@ -196,8 +199,9 @@
|
|
|
%%================================================================================
|
|
|
|
|
|
%% Create a new column family for the generation and a serializable representation of the schema
|
|
|
--spec create_new(rocksdb:db_handle(), emqx_replay_local_store:generation_id(), options()) ->
|
|
|
+-spec create_new(rocksdb:db_handle(), emqx_replay_local_store:gen_id(), options()) ->
|
|
|
{schema(), emqx_replay_local_store:cf_refs()}.
|
|
|
+%{schema(), emqx_replay_local_store:cf_refs()}.
|
|
|
create_new(DBHandle, GenId, Options) ->
|
|
|
CFName = data_cf(GenId),
|
|
|
CFOptions = maps:get(cf_options, Options, []),
|
|
|
@@ -208,30 +212,20 @@ create_new(DBHandle, GenId, Options) ->
|
|
|
%% Reopen the database
|
|
|
-spec open(
|
|
|
rocksdb:db_handle(),
|
|
|
- emqx_replay_local_store:generation_id(),
|
|
|
- [{_CFName :: string(), _CFHandle :: reference()}],
|
|
|
+ emqx_replay_local_store:gen_id(),
|
|
|
+ emqx_replay_local_store:cf_refs(),
|
|
|
schema()
|
|
|
) ->
|
|
|
db().
|
|
|
open(DBHandle, GenId, CFs, #schema{keymapper = Keymapper}) ->
|
|
|
- CFHandle = proplists:get_value(data_cf(GenId), CFs),
|
|
|
- % assert
|
|
|
- true = is_reference(CFHandle),
|
|
|
+ {value, {_, CFHandle}} = lists:keysearch(data_cf(GenId), 1, CFs),
|
|
|
#db{
|
|
|
handle = DBHandle,
|
|
|
cf = CFHandle,
|
|
|
keymapper = Keymapper
|
|
|
}.
|
|
|
|
|
|
--spec make_keymapper(Options) -> keymapper() when
|
|
|
- Options :: #{
|
|
|
- %% Number of bits in a message timestamp.
|
|
|
- timestamp_bits := bits(),
|
|
|
- %% Number of bits in a key allocated to each level in a message topic.
|
|
|
- topic_bits_per_level := bits_per_level(),
|
|
|
- %% Maximum granularity of iteration over time.
|
|
|
- epoch := time()
|
|
|
- }.
|
|
|
+-spec make_keymapper(options()) -> keymapper().
|
|
|
make_keymapper(#{
|
|
|
timestamp_bits := TimestampBits,
|
|
|
topic_bits_per_level := BitsPerLevel,
|
|
|
@@ -313,7 +307,7 @@ make_message_value(Topic, MessagePayload) ->
|
|
|
unwrap_message_value(Binary) ->
|
|
|
binary_to_term(Binary).
|
|
|
|
|
|
--spec combine(_Bitstring :: integer(), emqx_guid:guid(), keymapper()) ->
|
|
|
+-spec combine(_Bitstring :: integer(), emqx_guid:guid() | <<>>, keymapper()) ->
|
|
|
key().
|
|
|
combine(Bitstring, MessageID, #keymapper{bitsize = Size}) ->
|
|
|
<<Bitstring:Size/integer, MessageID/binary>>.
|
|
|
@@ -521,7 +515,7 @@ substring(I, Offset, Size) ->
|
|
|
(I bsr Offset) band ones(Size).
|
|
|
|
|
|
%% @doc Generate a column family ID for the MQTT messages
|
|
|
--spec data_cf(emqx_replay_local_store:gen_id()) -> string().
|
|
|
+-spec data_cf(emqx_replay_local_store:gen_id()) -> [char()].
|
|
|
data_cf(GenId) ->
|
|
|
?MODULE_STRING ++ integer_to_list(GenId).
|
|
|
|