|
@@ -277,13 +277,13 @@ store(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic,
|
|
|
Value = make_message_value(Topic, MessagePayload),
|
|
Value = make_message_value(Topic, MessagePayload),
|
|
|
rocksdb:put(DBHandle, CFHandle, Key, Value, DB#db.write_options).
|
|
rocksdb:put(DBHandle, CFHandle, Key, Value, DB#db.write_options).
|
|
|
|
|
|
|
|
--spec make_iterator(db(), emqx_ds:replay()) ->
|
|
|
|
|
|
|
+-spec make_iterator(db(), emqx_ds_replay:replay()) ->
|
|
|
{ok, iterator()} | {error, _TODO}.
|
|
{ok, iterator()} | {error, _TODO}.
|
|
|
make_iterator(DB, Replay) ->
|
|
make_iterator(DB, Replay) ->
|
|
|
Options = emqx_ds_conf:shard_iteration_options(DB#db.shard),
|
|
Options = emqx_ds_conf:shard_iteration_options(DB#db.shard),
|
|
|
make_iterator(DB, Replay, Options).
|
|
make_iterator(DB, Replay, Options).
|
|
|
|
|
|
|
|
--spec make_iterator(db(), emqx_ds:replay(), iteration_options()) ->
|
|
|
|
|
|
|
+-spec make_iterator(db(), emqx_ds_replay:replay(), iteration_options()) ->
|
|
|
% {error, invalid_start_time}? might just start from the beginning of time
|
|
% {error, invalid_start_time}? might just start from the beginning of time
|
|
|
% and call it a day: client violated the contract anyway.
|
|
% and call it a day: client violated the contract anyway.
|
|
|
{ok, iterator()} | {error, _TODO}.
|
|
{ok, iterator()} | {error, _TODO}.
|
|
@@ -337,7 +337,7 @@ preserve_iterator(#it{cursor = Cursor}) ->
|
|
|
},
|
|
},
|
|
|
term_to_binary(State).
|
|
term_to_binary(State).
|
|
|
|
|
|
|
|
--spec restore_iterator(db(), emqx_ds:replay(), binary()) ->
|
|
|
|
|
|
|
+-spec restore_iterator(db(), emqx_ds_replay:replay(), binary()) ->
|
|
|
{ok, iterator()} | {error, _TODO}.
|
|
{ok, iterator()} | {error, _TODO}.
|
|
|
restore_iterator(DB, Replay, Serial) when is_binary(Serial) ->
|
|
restore_iterator(DB, Replay, Serial) when is_binary(Serial) ->
|
|
|
State = binary_to_term(Serial),
|
|
State = binary_to_term(Serial),
|
|
@@ -419,7 +419,7 @@ hash(Input, Bits) ->
|
|
|
% at most 32 bits
|
|
% at most 32 bits
|
|
|
erlang:phash2(Input, 1 bsl Bits).
|
|
erlang:phash2(Input, 1 bsl Bits).
|
|
|
|
|
|
|
|
--spec make_keyspace_filter(emqx_ds:replay(), keymapper()) -> keyspace_filter().
|
|
|
|
|
|
|
+-spec make_keyspace_filter(emqx_ds_replay:replay(), keymapper()) -> keyspace_filter().
|
|
|
make_keyspace_filter({TopicFilter, StartTime}, Keymapper) ->
|
|
make_keyspace_filter({TopicFilter, StartTime}, Keymapper) ->
|
|
|
Bitstring = compute_bitstring(TopicFilter, StartTime, Keymapper),
|
|
Bitstring = compute_bitstring(TopicFilter, StartTime, Keymapper),
|
|
|
HashBitmask = compute_topic_bitmask(TopicFilter, Keymapper),
|
|
HashBitmask = compute_topic_bitmask(TopicFilter, Keymapper),
|