|
|
@@ -1133,23 +1133,106 @@ erase_schema_runtime(Shard) ->
|
|
|
|
|
|
-undef(PERSISTENT_TERM).
|
|
|
|
|
|
--define(ROCKSDB_SCHEMA_KEY, <<"schema_v1">>).
|
|
|
+-define(ROCKSDB_SCHEMA_KEY(V), <<"schema_", V>>).
|
|
|
+
|
|
|
+-define(ROCKSDB_SCHEMA_KEY, ?ROCKSDB_SCHEMA_KEY("v2")).
|
|
|
+-define(ROCKSDB_SCHEMA_KEYS, [
|
|
|
+ ?ROCKSDB_SCHEMA_KEY,
|
|
|
+ ?ROCKSDB_SCHEMA_KEY("v1")
|
|
|
+]).
|
|
|
|
|
|
-spec get_schema_persistent(rocksdb:db_handle()) -> shard_schema() | not_found.
|
|
|
get_schema_persistent(DB) ->
|
|
|
- case rocksdb:get(DB, ?ROCKSDB_SCHEMA_KEY, []) of
|
|
|
+ get_schema_persistent(DB, ?ROCKSDB_SCHEMA_KEYS).
|
|
|
+
|
|
|
+get_schema_persistent(DB, [Key | Rest]) ->
|
|
|
+ case rocksdb:get(DB, Key, []) of
|
|
|
{ok, Blob} ->
|
|
|
- Schema = binary_to_term(Blob),
|
|
|
- %% Sanity check:
|
|
|
- #{current_generation := _, prototype := _} = Schema,
|
|
|
- Schema;
|
|
|
+ deserialize_schema(Key, Blob);
|
|
|
not_found ->
|
|
|
- not_found
|
|
|
- end.
|
|
|
+ get_schema_persistent(DB, Rest)
|
|
|
+ end;
|
|
|
+get_schema_persistent(_DB, []) ->
|
|
|
+ not_found.
|
|
|
|
|
|
-spec put_schema_persistent(rocksdb:db_handle(), shard_schema()) -> ok.
|
|
|
put_schema_persistent(DB, Schema) ->
|
|
|
Blob = term_to_binary(Schema),
|
|
|
rocksdb:put(DB, ?ROCKSDB_SCHEMA_KEY, Blob, []).
|
|
|
|
|
|
+-spec deserialize_schema(_SchemaVsn :: binary(), binary()) -> shard_schema().
|
|
|
+deserialize_schema(SchemaVsn, Blob) ->
|
|
|
+ %% Sanity check:
|
|
|
+ Schema = #{current_generation := _, prototype := _} = binary_to_term(Blob),
|
|
|
+ decode_schema(SchemaVsn, Schema).
|
|
|
+
|
|
|
+decode_schema(?ROCKSDB_SCHEMA_KEY, Schema) ->
|
|
|
+ Schema;
|
|
|
+decode_schema(?ROCKSDB_SCHEMA_KEY("v1"), Schema) ->
|
|
|
+ maps:map(fun decode_schema_v1/2, Schema).
|
|
|
+
|
|
|
+decode_schema_v1(?GEN_KEY(_), Generation = #{}) ->
|
|
|
+ decode_generation_schema_v1(Generation);
|
|
|
+decode_schema_v1(_, V) ->
|
|
|
+ V.
|
|
|
+
|
|
|
+decode_generation_schema_v1(SchemaV1 = #{cf_refs := CFRefs}) ->
|
|
|
+ %% Drop potentially dead CF references from the time generation was created.
|
|
|
+ Schema = maps:remove(cf_refs, SchemaV1),
|
|
|
+ Schema#{cf_names => cf_names(CFRefs)};
|
|
|
+decode_generation_schema_v1(Schema = #{}) ->
|
|
|
+ Schema.
|
|
|
+
|
|
|
+%%--------------------------------------------------------------------------------
|
|
|
+
|
|
|
+-ifdef(TEST).
|
|
|
+-include_lib("eunit/include/eunit.hrl").
|
|
|
+
|
|
|
+decode_schema_v1_test() ->
|
|
|
+ SchemaV1 = #{
|
|
|
+ current_generation => 42,
|
|
|
+ prototype => {emqx_ds_storage_reference, #{}},
|
|
|
+ ?GEN_KEY(41) => #{
|
|
|
+ module => emqx_ds_storage_reference,
|
|
|
+ data => {schema},
|
|
|
+ cf_refs => [{"emqx_ds_storage_reference41", erlang:make_ref()}],
|
|
|
+ created_at => 12345,
|
|
|
+ since => 0,
|
|
|
+ until => 123456
|
|
|
+ },
|
|
|
+ ?GEN_KEY(42) => #{
|
|
|
+ module => emqx_ds_storage_reference,
|
|
|
+ data => {schema},
|
|
|
+ cf_refs => [{"emqx_ds_storage_reference42", erlang:make_ref()}],
|
|
|
+ created_at => 54321,
|
|
|
+ since => 123456,
|
|
|
+ until => undefined
|
|
|
+ }
|
|
|
+ },
|
|
|
+ ?assertEqual(
|
|
|
+ #{
|
|
|
+ current_generation => 42,
|
|
|
+ prototype => {emqx_ds_storage_reference, #{}},
|
|
|
+ ?GEN_KEY(41) => #{
|
|
|
+ module => emqx_ds_storage_reference,
|
|
|
+ data => {schema},
|
|
|
+ cf_names => ["emqx_ds_storage_reference41"],
|
|
|
+ created_at => 12345,
|
|
|
+ since => 0,
|
|
|
+ until => 123456
|
|
|
+ },
|
|
|
+ ?GEN_KEY(42) => #{
|
|
|
+ module => emqx_ds_storage_reference,
|
|
|
+ data => {schema},
|
|
|
+ cf_names => ["emqx_ds_storage_reference42"],
|
|
|
+ created_at => 54321,
|
|
|
+ since => 123456,
|
|
|
+ until => undefined
|
|
|
+ }
|
|
|
+ },
|
|
|
+ deserialize_schema(?ROCKSDB_SCHEMA_KEY("v1"), term_to_binary(SchemaV1))
|
|
|
+ ).
|
|
|
+
|
|
|
+-endif.
|
|
|
+
|
|
|
-undef(ROCKSDB_SCHEMA_KEY).
|