|
|
@@ -26,7 +26,16 @@
|
|
|
%% internal exports:
|
|
|
-export([db_dir/1]).
|
|
|
|
|
|
--export_type([gen_id/0, generation/0, cf_refs/0, stream/0, iterator/0]).
|
|
|
+-export_type([
|
|
|
+ gen_id/0,
|
|
|
+ generation/0,
|
|
|
+ cf_refs/0,
|
|
|
+ stream/0,
|
|
|
+ iterator/0,
|
|
|
+ shard_id/0,
|
|
|
+ options/0,
|
|
|
+ prototype/0
|
|
|
+]).
|
|
|
|
|
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
|
|
|
|
|
@@ -64,7 +73,7 @@
|
|
|
?enc := term()
|
|
|
}.
|
|
|
|
|
|
-%% Note: this might be stored permanently on a remote node.
|
|
|
+%% Note: this might be stred permanently on a remote node.
|
|
|
-opaque iterator() ::
|
|
|
#{
|
|
|
?tag := ?IT,
|
|
|
@@ -109,17 +118,19 @@
|
|
|
%% Shard (runtime):
|
|
|
-type shard() :: shard(generation()).
|
|
|
|
|
|
+-type options() :: map().
|
|
|
+
|
|
|
%%================================================================================
|
|
|
%% Generation callbacks
|
|
|
%%================================================================================
|
|
|
|
|
|
%% Create the new schema given generation id and the options.
|
|
|
%% Create rocksdb column families.
|
|
|
--callback create(shard_id(), rocksdb:db_handle(), gen_id(), _Options) ->
|
|
|
+-callback create(shard_id(), rocksdb:db_handle(), gen_id(), Options :: map()) ->
|
|
|
{_Schema, cf_refs()}.
|
|
|
|
|
|
%% Open the existing schema
|
|
|
--callback open(shard_id(), rocsdb:db_handle(), gen_id(), cf_refs(), _Schema) ->
|
|
|
+-callback open(shard_id(), rocksdb:db_handle(), gen_id(), cf_refs(), _Schema) ->
|
|
|
_Data.
|
|
|
|
|
|
-callback store_batch(shard_id(), _Data, [emqx_types:message()], emqx_ds:message_store_opts()) ->
|
|
|
@@ -138,7 +149,7 @@
|
|
|
%% API for the replication layer
|
|
|
%%================================================================================
|
|
|
|
|
|
--spec open_shard(shard_id(), emqx_ds:builtin_db_opts()) -> ok.
|
|
|
+-spec open_shard(shard_id(), options()) -> ok.
|
|
|
open_shard(Shard, Options) ->
|
|
|
emqx_ds_storage_layer_sup:ensure_shard(Shard, Options).
|
|
|
|
|
|
@@ -215,13 +226,13 @@ next(Shard, Iter = #{?tag := ?IT, ?generation := GenId, ?enc := GenIter0}, Batch
|
|
|
|
|
|
-define(REF(ShardId), {via, gproc, {n, l, {?MODULE, ShardId}}}).
|
|
|
|
|
|
--spec start_link(shard_id(), emqx_ds:builtin_db_opts()) ->
|
|
|
+-spec start_link(shard_id(), options()) ->
|
|
|
{ok, pid()}.
|
|
|
start_link(Shard = {_, _}, Options) ->
|
|
|
gen_server:start_link(?REF(Shard), ?MODULE, {Shard, Options}, []).
|
|
|
|
|
|
-record(s, {
|
|
|
- shard_id :: emqx_ds:shard_id(),
|
|
|
+ shard_id :: shard_id(),
|
|
|
db :: rocksdb:db_handle(),
|
|
|
cf_refs :: cf_refs(),
|
|
|
schema :: shard_schema(),
|
|
|
@@ -352,7 +363,7 @@ commit_metadata(#s{shard_id = ShardId, schema = Schema, shard = Runtime, db = DB
|
|
|
ok = put_schema_persistent(DB, Schema),
|
|
|
put_schema_runtime(ShardId, Runtime).
|
|
|
|
|
|
--spec rocksdb_open(shard_id(), emqx_ds:builtin_db_opts()) ->
|
|
|
+-spec rocksdb_open(shard_id(), options()) ->
|
|
|
{ok, rocksdb:db_handle(), cf_refs()} | {error, _TODO}.
|
|
|
rocksdb_open(Shard, Options) ->
|
|
|
DBOptions = [
|